In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("kafka")
    # .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")
    # .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.3.3")
    .master("spark://chinmay:7077")
    .getOrCreate()
)

spark

:: loading settings :: url = jar:file:/home/hadoop/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/hadoop/.ivy2/cache
The jars for the packages stored in: /home/hadoop/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ab41754e-9a5f-44fc-836d-0d42115a7df7;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	fou

### Create a kafka_df to read from kafka

In [2]:
trump_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "chinmay:9092")
    .option("subscribe", "trump")
    .option("startingOffsets", "earliest")
    .load()
)

biden_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "chinmay:9092")
    .option("subscribe", "biden")
    .option("startingOffsets", "earliest")
    .load()
)

### View schema for kafka_df

In [3]:
trump_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### Parse value from binary to string into kafka_json_df

In [4]:
from pyspark.sql.functions import expr
repartitioned_trump_df = trump_df.repartition(32)
trump_json_df = repartitioned_trump_df.withColumn("value",expr("cast(value as string)"))

In [5]:
repartitioned_biden_df = biden_df.repartition(32)
biden_json_df = repartitioned_biden_df.withColumn("value",expr("cast(value as string)"))

In [6]:
trump_json_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
biden_json_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [8]:
# Schema of the Pyaload

from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
json_schema = (
    StructType(
    [StructField('tweet', StringType(), True), 
    StructField('state', StringType(), True),
    ])
)

In [9]:
# Apply the schema to payload to read the data
from pyspark.sql.functions import from_json,col

streaming_trump_df = trump_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")
streaming_biden_df = biden_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")


In [10]:
from pyspark.sql.types import StringType, StructType, StructField, FloatType
from pyspark.sql.functions import from_json, col, udf

In [11]:
streaming_trump_df.printSchema()

root
 |-- tweet: string (nullable = true)
 |-- state: string (nullable = true)



In [12]:
streaming_biden_df.printSchema()

root
 |-- tweet: string (nullable = true)
 |-- state: string (nullable = true)



In [13]:
from sparknlp.base import DocumentAssembler, Pipeline, light_pipeline
from sparknlp.annotator import(
    UniversalSentenceEncoder,
    SentimentDLModel
)

import pyspark.sql.functions as f

documentAssembler = DocumentAssembler()\
                    .setInputCol("tweet")\
                    .setOutputCol("cleaned_tweet")

use = UniversalSentenceEncoder.pretrained("tfhub_use", "en")\
 .setInputCols(["cleaned_tweet"])\
 .setOutputCol("tweet_embeddings")

sentimentdl = SentimentDLModel.pretrained("sentimentdl_use_twitter", "en")\
    .setInputCols(["tweet_embeddings"])\
    .setOutputCol("sentiment")

nlpPipeline = Pipeline(
    stages=[
        documentAssembler,
        use,
        sentimentdl
    ]
)

tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[ | ]tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[ / ]Download done! Loading the resource.
[ — ]

                                                                                

[ | ]

2024-04-25 22:12:22.888035: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


[OK!]
sentimentdl_use_twitter download started this may take some time.
Approximate size to download 11.4 MB
[ | ]sentimentdl_use_twitter download started this may take some time.
Approximate size to download 11.4 MB
[ / ]Download done! Loading the resource.
[OK!]


In [14]:
empty_df = spark.createDataFrame([['']]).toDF("text")
model = nlpPipeline.fit(empty_df)
result_trump_df = model.transform(streaming_trump_df)
result_biden_df = model.transform(streaming_biden_df)

In [15]:
result_trump_df.printSchema()

root
 |-- tweet: string (nullable = true)
 |-- state: string (nullable = true)
 |-- cleaned_tweet: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- tweet_embeddings: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    | 

In [16]:
result_biden_df.printSchema()

root
 |-- tweet: string (nullable = true)
 |-- state: string (nullable = true)
 |-- cleaned_tweet: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- tweet_embeddings: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    | 

In [17]:
final_trump_df = result_trump_df.select(
    "state",
    f.explode(
      f.arrays_zip( 
        result_trump_df.cleaned_tweet.result,
        result_trump_df.sentiment.result)).alias("cols")
).select(
    "state",
    f.expr("cols['0']").alias("cleaned_tweet"),
    f.expr("cols['1']").alias("sentiment")
)

In [18]:
final_biden_df = result_biden_df.select(
    "state",
    f.explode(f.arrays_zip(
        result_biden_df.cleaned_tweet.result, 
        result_biden_df.sentiment.result
    )).alias("cols")
).select(
    "state",
    f.expr("cols['0']").alias("cleaned_tweet"),
    f.expr("cols['1']").alias("sentiment")
)


In [19]:
final_trump_df.printSchema()

root
 |-- state: string (nullable = true)
 |-- cleaned_tweet: string (nullable = true)
 |-- sentiment: string (nullable = true)



In [20]:
final_biden_df.printSchema()

root
 |-- state: string (nullable = true)
 |-- cleaned_tweet: string (nullable = true)
 |-- sentiment: string (nullable = true)



In [21]:
import os
from time import sleep
os.system("rm data_trump.csv data_biden.csv")
sleep(5)

In [22]:
import csv 

with open('data_trump.csv', 'w') as csv_file:
    csv_writer = csv.DictWriter(csv_file, fieldnames = ['batch', 'state','trump_tweet', 'trump_sentiment'])
    csv_writer.writeheader()
with open('data_biden.csv', 'w') as csv_file:
    csv_writer = csv.DictWriter(csv_file, fieldnames = ['batch', 'state','biden_tweet', 'biden_sentiment'])
    csv_writer.writeheader()

In [23]:
def process_batch_trump(df, epoch_id):
    # df.groupBy()
    df.toPandas().to_csv('data_trump.csv', mode = 'a', header = False)

In [24]:
def process_batch_biden(df, epoch_id):
    # df.groupBy()
    df.toPandas().to_csv('data_biden.csv', mode = 'a', header = False)

In [25]:
query_trump = (final_trump_df
.writeStream
.foreachBatch(process_batch_trump)
.outputMode("append")
.option("checkpointLocation","checkpoint_dir_trump")
.start()
)

query_biden = (final_biden_df
.writeStream
.foreachBatch(process_batch_biden)
.outputMode("append")
.option("checkpointLocation","checkpoint_dir_biden")
.start() 
)

24/04/25 22:12:48 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/04/25 22:12:50 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [None]:
query_biden.awaitTermination()
query_trump.awaitTermination()

24/04/25 22:12:52 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
24/04/25 22:12:53 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                