In [1]:
import os
import sys

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
# Importing necessary libraries
import json
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf,to_json, struct
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
import re
from pyspark.sql.types import StructType, StructField, StringType


In [3]:
# Create SparkSession
spark = SparkSession.\
        builder.\
        appName("streaming Write").\
        config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1').\
        getOrCreate()
spark

In [4]:
kafka_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "tweets") \
        .option("startingOffsets","earliest") \
        .load()

kafka_df

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [5]:
# View schema for raw kafka_df
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
# Parse value from binay to string into kafka_json_df
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [7]:
from pyspark.sql.types import StructType, StructField, StringType

# Define schema for JSON data
json_schema = StructType([
    StructField("Tweet_ID", StringType(), True),
    StructField("Entity", StringType(), True),
    StructField("sentiment", StringType(), True),
    StructField("tweet_content", StringType(), True)
])

In [8]:
# Apply the schema to payload to read the data
from pyspark.sql.functions import from_json,col

streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")

In [9]:
# To the schema of the data, place a sample json file and change readStream to read
streaming_df.printSchema()

root
 |-- Tweet_ID: string (nullable = true)
 |-- Entity: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- tweet_content: string (nullable = true)



In [10]:
from Clean_Func import clean_tweet_content
# Define UDFs (User Defined Functions) for preprocessing
clean_tweet_content_udf = udf(clean_tweet_content, StringType())

# Apply preprocessing to streaming DataFrame
preprocessed_df = streaming_df.withColumn("cleaned_tweet_content", clean_tweet_content_udf(streaming_df["tweet_content"]))


In [11]:
preprocessed_df.printSchema()

root
 |-- Tweet_ID: string (nullable = true)
 |-- Entity: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- tweet_content: string (nullable = true)
 |-- cleaned_tweet_content: string (nullable = true)



In [12]:
from pyspark.ml import PipelineModel
# Load the model to use:
loaded_model = PipelineModel.load('pipeline_v1')


# Apply the pre-trained model to make predictions
predictions_df = loaded_model.transform(preprocessed_df)

# Show the predictions
predictions_df.printSchema()


root
 |-- Tweet_ID: string (nullable = true)
 |-- Entity: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- tweet_content: string (nullable = true)
 |-- cleaned_tweet_content: string (nullable = true)
 |-- Tweet_vect_: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Filtered_tweet: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [13]:
alfa = predictions_df.select("Tweet_ID", "Entity", "sentiment", "tweet_content", "cleaned_tweet_content", "prediction")
alfa.printSchema()

root
 |-- Tweet_ID: string (nullable = true)
 |-- Entity: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- tweet_content: string (nullable = true)
 |-- cleaned_tweet_content: string (nullable = true)
 |-- prediction: double (nullable = false)



In [14]:
query = alfa \
    .writeStream \
    .outputMode("append") \
    .format("json")\
    .option("path", "json_files")\
    .option("checkpointLocation", "checkpoints")\
    .start()

In [None]:
query.awaitTermination()