In [None]:
# Import required libraries
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler, MinMaxScaler
from pyspark.ml.stat import Summarizer
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import sys


In [None]:
# Initialize Spark Session with Dynamic Allocation
spark = SparkSession.builder \
    .appName("Final Gold Layer") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "3") \
    .config("spark.dynamicAllocation.maxExecutors", "9") \
    .config("spark.dynamicAllocation.initialExecutors", "3") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.extraPythonPackages", "vaderSentiment") \
    .config("spark.driver.extraPythonPackages", "vaderSentiment") \
    .config("spark.executorEnv.PYTHONPATH", ":".join(sys.path)) \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("INFO")

In [None]:
# Read the Parquet file from the silver layer
file_path = "hdfs://namenode:9000/data/cleaned_dataset.parquet"
df = spark.read.parquet(file_path)

# Select only columns needed for processing to minimize memory usage
df = df.select("comment_id", "body", "created_utc", "sub_reddit")

df.printSchema()
df.show(5, truncate=False)

In [None]:
# Clean comments
def clean_comment_spark(df, column):
    return df.withColumn(
        f"{column}_clean",
        F.trim(
            F.regexp_replace(
                F.regexp_replace(
                    F.regexp_replace(
                        F.lower(F.col(column)),
                        r"http\S+|www\S+|https\S+", ""),
                    r"@\w+|#", ""),
                r"[^\w\s]", "")
            )
        )
    
df = clean_comment_spark(df, "body")

# Cache cleaned dataset
df.cache()

In [None]:
# Broadcast Sentiment Analyzer
analyzer_broadcast = sc.broadcast(SentimentIntensityAnalyzer())

# Sentiment calculation using RDDs
def calculate_sentiment(row):
    analyzer = analyzer_broadcast.value
    comment_id = row['comment_id']
    text = row['body_clean']
    sentiment_score = analyzer.polarity_scores(text)['compound'] if text else None
    return (comment_id, sentiment_score)

sentiment_rdd = df.rdd.map(calculate_sentiment)

schema = StructType([
    StructField("comment_id", StringType(), True),
    StructField("sentiment", FloatType(), True)
])

sentiment_df = spark.createDataFrame(sentiment_rdd, schema)

# Save only comment_id and sentiment to minimize storage
sentiment_output_path = "hdfs://namenode:9000/data/results/comment_sentiment.parquet"
sentiment_df.write.mode("overwrite").parquet(sentiment_output_path)

In [None]:
# Use in-memory data to compute sentiment trends
df_with_sentiment = df.join(sentiment_df, on="comment_id", how="inner") \
    .select("comment_id", "body_clean", "created_utc", "sub_reddit", "sentiment")

# Convert timestamp to date for trend analysis
df_with_sentiment = df_with_sentiment.withColumn("date", F.from_unixtime(F.col("created_utc"), "yyyy-MM-dd"))

# Overall daily sentiment
df_daily_sentiment_all = df_with_sentiment.groupBy("date").agg(F.avg("sentiment").alias("avg_daily_sentiment_all"))

# Daily sentiment per subreddit
df_daily_sentiment_subreddit = df_with_sentiment.groupBy("date", "sub_reddit") \
    .agg(F.avg("sentiment").alias("avg_daily_sentiment_subreddit"))

# Join subreddit and overall trends
df_trend_comparison = df_daily_sentiment_subreddit.join(
    df_daily_sentiment_all, on="date", how="left"
).withColumn(
    "sentiment_diff",
    F.col("avg_daily_sentiment_subreddit") - F.col("avg_daily_sentiment_all")
)

# Save sentiment trends with minimal columns
trends_output_path = "hdfs://namenode:9000/data/results/sentiment_trends.parquet"
df_trend_comparison.select("date", "sub_reddit", "avg_daily_sentiment_subreddit", "sentiment_diff") \
    .write.mode("overwrite").parquet(trends_output_path)

In [None]:
# Body text processing pipeline
tokenizer = Tokenizer(inputCol="body_clean", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# TF-IDF
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=1000)
idf = IDF(inputCol="raw_features", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, stopwords_remover, hashing_tf, idf])
model = pipeline.fit(df_with_sentiment)
tfidf_df = model.transform(df_with_sentiment)

In [None]:
# KMeans clustering
kmeans = KMeans(k=5, seed=123)
model = kmeans.fit(tfidf_df)
predictions = model.transform(tfidf_df)

# Cache clustering predictions
# predictions.cache()

# Sentiment analysis by cluster
sentiment_analysis = predictions.groupBy("prediction").agg(
    F.mean("sentiment").alias("average_sentiment"),
    F.count("sub_reddit").alias("subreddit_count")
)

# Clustering evaluation
evaluator = ClusteringEvaluator(featuresCol="features")
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette}")

In [None]:
# Save clustering results
cluster_output_path = "hdfs://namenode:9000/data/results/clustered_sentiment_text_results.parquet"
predictions.write.mode("overwrite").parquet(cluster_output_path)

# # Save summarized sentiment analysis by cluster
cluster_summary_output_path = "hdfs://namenode:9000/data/results/cluster_summary_text_sentiment.parquet"
sentiment_analysis.write.mode("overwrite").parquet(cluster_summary_output_path)

In [None]:
sc.stop()