In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.6.0.jar

--2023-06-16 10:34:20--  https://jdbc.postgresql.org/download/postgresql-42.6.0.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1081604 (1.0M) [application/java-archive]
Saving to: ‘postgresql-42.6.0.jar’


2023-06-16 10:34:23 (732 KB/s) - ‘postgresql-42.6.0.jar’ saved [1081604/1081604]



In [6]:
!wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.0-preview2/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar

--2023-06-16 16:17:05--  https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.0-preview2/spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.36.209, 2a04:4e42:9::209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.36.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 344977 (337K) [application/java-archive]
Saving to: ‘spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar’


2023-06-16 16:17:06 (1.91 MB/s) - ‘spark-sql-kafka-0-10_2.12-3.0.0-preview2.jar’ saved [344977/344977]



In [5]:
import re
import pickle

from datetime import datetime, timezone

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import explode, split, col, upper, filter, udf, from_json, base64, decode
from pyspark.sql.types import ArrayType, MapType, StringType, TimestampType, IntegerType, StructType, StructField

In [2]:
spark = SparkSession.builder.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0").master("local").getOrCreate()

In [15]:
with open("sentiment_tagger.pkl", "rb") as f:
    model = pickle.load(f)
    spark.sparkContext.broadcast(model)


def remove_non_letters(s):
    regex = re.compile('[^a-zA-Z]')
    return regex.sub('', s)


def tag_sentiment(s):
    return "POSITIVE" if model.predict([s]) == 1 else "NEGATIVE"


remove_non_letters_udf = udf(remove_non_letters)
tag_sentiment_udf = udf(tag_sentiment)

In [16]:
schema = StructType([
    StructField("id", IntegerType()),
    StructField("poster_id", IntegerType()),
    StructField("timestamp", TimestampType()),
    StructField("body", StringType()),
])


KAFKA_HOST = "kafka"
KAFKA_PORT = 9092
KAFKA_TOPIC = "posts"
RESULTS_DIRECTORY_PATH = "/home/jovyan/results"


def unique_words(df, batch_id):
    posts = df.withColumn("json", from_json(decode(col("value"), "utf-8"), schema)).select("json.*")
    result = posts\
    .distinct()\
    .select(explode(split(upper(col("body")), "\s+")))\
    .select(remove_non_letters_udf("col"))\
    .groupby("remove_non_letters(col)")\
    .count().sort(col("count").desc())
    result.show()
    timestamp = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat()
    result.write.csv(f"{RESULTS_DIRECTORY_PATH}/{timestamp}")
    
    
def tag_sentiment_df(df, batch_id):
    posts = df.withColumn("json", from_json(decode(col("value"), "utf-8"), schema)).select("json.*")
    result = posts\
    .distinct()\
    .withColumn("sentiment", tag_sentiment_udf("body"))\
    .select("id", "sentiment")\
    .show()
    
    
def func_call(df, batch_id):
    unique_words(df, batch_id)
    tag_sentiment_df(df, batch_id)
    
    
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", f"{KAFKA_HOST}:{KAFKA_PORT}") \
  .option("subscribe", KAFKA_TOPIC) \
  .load() 
query = df.writeStream \
    .foreachBatch(func_call) \
    .trigger(processingTime="1 minutes") \
    .start().awaitTermination()

+-----------------------+-----+
|remove_non_letters(col)|count|
+-----------------------+-----+
|               MILITARY|    2|
|                  THREE|    1|
|                  AVOID|    1|
|              AUTHORITY|    1|
|             COLLECTION|    1|
|                    ASK|    1|
|                     BY|    1|
|                   BILL|    1|
|               RESPONSE|    1|
|                  EARLY|    1|
|                  SOUTH|    1|
|                    ITS|    1|
|                   FEEL|    1|
|                 ENERGY|    1|
|                    EYE|    1|
|                ABILITY|    1|
|               MAINTAIN|    1|
|                 SIMPLY|    1|
|              CHALLENGE|    1|
|                  PRICE|    1|
+-----------------------+-----+
only showing top 20 rows

+----+---------+
|  id|sentiment|
+----+---------+
|7821| NEGATIVE|
|7820| NEGATIVE|
+----+---------+

+-----------------------+-----+
|remove_non_letters(col)|count|
+-----------------------+-----+
|      

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 