In [1]:
import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, TimestampType, StringType, DoubleType, LongType
from pyspark.sql.functions import udf, from_json, col, from_unixtime
from textblob import TextBlob

In [2]:
os.environ

environ{'CONDA_EXE': '/opt/conda/bin/conda',
        '_CE_M': '',
        'HOSTNAME': 'f8a3b8050f80',
        'PYSPARK_DRIVER_PYTHON': 'python3.9',
        'ENV': '',
        'PWD': '/app',
        'GSETTINGS_SCHEMA_DIR': '/opt/conda/envs/5003-project/share/glib-2.0/schemas',
        'CONDA_PREFIX': '/opt/conda/envs/5003-project',
        'GSETTINGS_SCHEMA_DIR_CONDA_BACKUP': '',
        'KAFKA_TOPIC_NAME': 'us-election-tweet',
        'DB_PORT': '5432',
        'KAFKA_CONNECTION_STRING': '',
        'DB_USER': 'postgres',
        'HOME': '/root',
        'LANG': 'C.UTF-8',
        'CONDA_PROMPT_MODIFIER': '(5003-project) ',
        'PYSPARK_PYTHON': 'python3.9',
        'PYTHONPATH': '/app/',
        '_CE_CONDA': '',
        'DB_HOST': 'db-timescale',
        'CONDA_SHLVL': '2',
        'SHLVL': '0',
        'BASH_ENV': '~/.bashrc',
        'DB_POOL_SIZE': '1',
        'CONDA_PYTHON_EXE': '/opt/conda/bin/python',
        'DB_NAME': '5003-project-dev',
        'CONDA_DEFAULT_ENV': '5003

In [2]:
DATABASE_HOST = os.environ.get('DB_HOST', '')
DATABASE_PORT = os.environ.get('DB_PORT', '')
DATABASE_NAME = os.environ.get('DB_NAME', '')
DATABASE_USER = os.environ.get('DB_USER', '')
DATABASE_PASS = os.environ.get('DB_PASS', '')
ENV = os.environ.get('ENV', 'dev')
KAFKA_CONNECTION_STRING = os.environ.get('KAFKA_CONNECTION_STRING', 'kafka-broker:9092')
KAFKA_TOPIC_NAME = os.environ.get('KAFKA_TOPIC_NAME', 'us-election-tweet')

ENV = 'dev' if ENV == '' else ENV
KAFKA_CONNECTION_STRING = 'kafka-broker:9092' if ENV == 'dev' else KAFKA_CONNECTION_STRING

In [3]:
spark = (
    SparkSession.builder
    .appName('5003-project')
    .master('spark://spark-master:7077')
    .config("spark.files.overwrite", "true")
    .config('spark.driver.extraClassPath', '/opt/drivers/postgresql-42.2.19.jar')
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2')
    .enableHiveSupport()
    .getOrCreate()
)



:: loading settings :: url = jar:file:/opt/conda/envs/5003-project/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-04d39904-c73a-4728-ad78-61c9f6428345;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 405ms :: artifacts dl 14ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central in [default]


In [4]:
schema = (
    StructType()
    .add("created_at", LongType(), True)
    .add("tweet_id", DoubleType(), True)
    .add("tweet", StringType(), True)
    .add("likes", DoubleType(), True)
    .add("retweet_count", DoubleType(), True)
    .add("source", StringType(), True)
    .add("user_id", DoubleType(), True)
    .add("user_name", StringType(), True)
    .add("user_screen_name", StringType(), True)
    .add("user_description", StringType(), True)
    .add("user_join_date", TimestampType(), True)
    .add("user_followers_count", DoubleType(), True)
    .add("user_location", StringType(), True)
    .add("lat", DoubleType(), True)
    .add("long", DoubleType(), True)
    .add("city", StringType(), True)
    .add("country", StringType(), True)
    .add("continent", StringType(), True)
    .add("state", StringType(), True)
    .add("state_code", StringType(), True)
    .add("collected_at", TimestampType(), True)
    .add("person", StringType(), True)
    .add("time_to_sleep", DoubleType(), True)
)

In [5]:
def get_polarity(tweet):
    return TextBlob(tweet).sentiment.polarity

udf_get_polarity = udf(get_polarity, DoubleType())

In [7]:
df = (spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_CONNECTION_STRING)
    .option("subscribe", KAFKA_TOPIC_NAME)
    .option("startingOffsets", "earliest")
    .load()
    .select(from_json(col("value").cast("string"), schema).alias("data"))
    .select("data.*")
    .withColumn('created_at', from_unixtime(col("created_at") / 1000).cast("timestamp"))
    .withColumn("score", udf_get_polarity(col("tweet")))
)

In [8]:
def timescale_sink(df, batch_id):
    url = f'jdbc:postgresql://{DATABASE_HOST}:{DATABASE_PORT}/{DATABASE_NAME}?user={DATABASE_USER}&password={DATABASE_PASS}'
    df.filter(df['person']=='biden').write.jdbc(url, 'joebiden_tweets', mode='append')
    df.filter(df['person']=='trump').write.jdbc(url, 'donaldtrump_tweets', mode='append')    

21/11/17 15:29:52 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a002e38d-492a-4056-b32b-d8e15bcd5e29. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

KeyboardInterrupt: 

In [None]:
write_biden_table = (df
    .writeStream
    .outputMode("append") 
    .foreachBatch(timescale_sink)
    .start()
    .awaitTermination())