With MySQL db

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import regexp_replace, lower, col, from_json, udf
from pyspark.ml.feature import StopWordsRemover, Tokenizer
from pyspark.ml import Pipeline
from textblob import TextBlob
import mysql.connector


# Spark session
spark = SparkSession.builder.appName("SentimentAnalysis") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,mysql:mysql-connector-java:8.0.32") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Read data from Kafka topic
kafka_brokers = "localhost:9092"
topic_name = "nyt"

kafka_data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafka_brokers).option("subscribe", topic_name).load()

# Define the schema for your JSON data
json_schema = StructType([StructField("headline", StringType(), True),
                         StructField("content", StringType(), True),
                         StructField("news_desk", StringType(), True),
                         StructField("section", StringType(), True),
                         StructField("source", StringType(), True),
                         StructField("word_count", IntegerType(), True)])

# Deserialize JSON data
deserialized_data = kafka_data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .withColumn("json_data", from_json("value", json_schema)).select("json_data.*")

deserialized_data = deserialized_data.withColumnRenamed("source", "source_") #to match the field name in the db

# Clean text by removing non-alphanumeric characters and converting to lowercase, remove numbers
cleaned_data = deserialized_data.withColumn("cleaned_content", 
                        lower(regexp_replace(col("content"), "[^a-zA-Z\\s]", "")))
#remove punctuations
cleaned_data = cleaned_data.withColumn("cleaned_content", regexp_replace(col("cleaned_content"), r'[^\w\s]', ''))

# Tokenization (splitting text into words)
tokenizer = Tokenizer(inputCol="cleaned_content", outputCol="words")

# Remove stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# Define a UDF to join the words back into a single string
def join_text(words):
    return " ".join(words)

join_udf = udf(join_text, StringType())

# Create a pipeline for preprocessing
preprocessing_pipeline = Pipeline(stages=[tokenizer, remover])
preprocessing_model = preprocessing_pipeline.fit(cleaned_data)
preprocessed_data = preprocessing_model.transform(cleaned_data)

# Apply the sentiment analysis UDF to the preprocessed content
preprocessed_data = preprocessed_data.withColumn("cleaned_content", join_udf(col("filtered_words")))

# Define a user-defined function to calculate polarity
def Polarity(content: str) -> float:
    return TextBlob(content).sentiment.polarity

polarity_udf = udf(Polarity, FloatType())

# Define UDF for sentiment analysis
def Sentiment(polarity_value: float) -> str:
    if polarity_value < 0:
        return "Negative"
    elif polarity_value == 0:
        return "Neutral"
    else:
        return "Positive"
    
sentiment_udf = udf(Sentiment, StringType())

# Apply the sentiment analysis UDF to the preprocessed content
# Calculate sentiment
sentiment_df = preprocessed_data.withColumn("Polarity", polarity_udf(col("cleaned_content")))
sentiment_df = sentiment_df.withColumn("Sentiment", sentiment_udf(col("polarity")))
sentiment_df = sentiment_df.select("headline", "content", "cleaned_content", "news_desk", "section", "source_",
                                  "word_count", "Polarity", "Sentiment")

#loading the sentiment_df dataframe to mysql db
mysql_host_name = "localhost"
mysql_port_no = "3306"
mysql_database_name = "nyt_demo"
mysql_driver_class= "com.mysql.cj.jdbc.Driver"
mysql_table_name = "articles"
mysql_user_name = "root"
mysql_password = "password"
mysql_jdbc_url = "jdbc:mysql://" + mysql_host_name + ":" + mysql_port_no +"/" + mysql_database_name

mysql_properties = {
    "url": mysql_jdbc_url,
    "driver": mysql_driver_class,
    "user": mysql_user_name,
    "password": mysql_password,
}

# Function to save the DataFrame to MySQL
def save_to_mysql(df, properties, table_name):
    df.write.jdbc(
        url=properties["url"],
        table=table_name,
        mode="append",  
        properties={
            "driver": properties["driver"],
            "user": properties["user"],
            "password": properties["password"],
        }
    )

# Start the Spark stream to read from Kafka
query = sentiment_df.writeStream.outputMode("append").foreachBatch(
    lambda df, epoch_id: save_to_mysql(df, mysql_properties, mysql_table_name)
).start()

# Wait for the streaming query to terminate
query.awaitTermination()

                                                                                