In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, regexp_extract
from transformers import pipeline
from textblob import TextBlob
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, FloatType, StringType

In [0]:
spark = SparkSession.builder.appName("SocialMediaSentimentAnalysis").getOrCreate()

In [0]:
storage_account_name = "socialtrendstorage"
container_name = "social-data-trends"

In [0]:
storage_access_key = dbutils.secrets.get(scope="socialtrend-secrets", key="storage-access-key")

In [0]:
twitter_path = f"/mnt/{container_name}/twitter/*.json"
reddit_path = f"/mnt/{container_name}/reddit/*.json"

In [0]:
# Load Data with Filename Extraction
df_twitter = spark.read.option("multiline", "true").json(twitter_path).withColumn("filename", input_file_name())
df_reddit = spark.read.option("multiline", "true").json(reddit_path).withColumn("filename", input_file_name())

In [0]:
timestamp_pattern = r"(\d{4}-\d{2}-\d{2}_\d{2}-\d{2})"
df_twitter = df_twitter.withColumn("creation_time", regexp_extract("filename", timestamp_pattern, 1))
df_reddit = df_reddit.withColumn("creation_time", regexp_extract("filename", timestamp_pattern, 1))

In [0]:
df_twitter = df_twitter.drop("filename")
df_reddit = df_reddit.drop("filename")

In [0]:
latest_twitter_timestamp = (
    df_twitter.select("creation_time").distinct().orderBy("creation_time", ascending=False).first()["creation_time"]
)
latest_reddit_timestamp = (
    df_reddit.select("creation_time").distinct().orderBy("creation_time", ascending=False).first()["creation_time"]
)

In [0]:
df_twitter = df_twitter.filter(df_twitter["creation_time"] == latest_twitter_timestamp)
df_reddit = df_reddit.filter(df_reddit["creation_time"] == latest_reddit_timestamp)

In [0]:
sentiment_pipeline = pipeline("sentiment-analysis", model="nlptown/bert-base-multilingual-uncased-sentiment")

Device set to use cpu


In [0]:
def analyze_sentiment(text):
    try:
        result = sentiment_pipeline(text[:512])[0]  # Limit input size
        sentiment_score = int(result["label"].split()[0])  # Convert '5 stars' to integer
        return sentiment_score
    except:
        return 3  # Default sentiment 'Neutral'

In [0]:
sentiment_udf = udf(analyze_sentiment, IntegerType())
polarity_udf = udf(lambda text: TextBlob(text).sentiment.polarity, FloatType())
subjectivity_udf = udf(lambda text: TextBlob(text).sentiment.subjectivity, FloatType())

In [0]:
df_twitter = df_twitter.withColumn("sentiment_score", sentiment_udf(df_twitter["text"]))
df_twitter = df_twitter.withColumn("polarity", polarity_udf(df_twitter["text"]))
df_twitter = df_twitter.withColumn("subjectivity", subjectivity_udf(df_twitter["text"]))

In [0]:
df_reddit = df_reddit.withColumn("sentiment_score", sentiment_udf(df_reddit["text"]))
df_reddit = df_reddit.withColumn("polarity", polarity_udf(df_reddit["text"]))
df_reddit = df_reddit.withColumn("subjectivity", subjectivity_udf(df_reddit["text"]))

In [0]:
output_path_twitter = f"/mnt/{container_name}/processed/twitter_sentiment"
output_path_reddit = f"/mnt/{container_name}/processed/reddit_sentiment"

df_twitter.write.mode("overwrite").partitionBy("creation_time").parquet(output_path_twitter)
df_reddit.write.mode("overwrite").partitionBy("creation_time").parquet(output_path_reddit)

print("✅ Sentiment data saved in partitioned parquet format!")

In [None]:
df_twitter.show(5)

In [None]:
df_reddit.show(5)