In [2]:
import os
import sys
import findspark

# Point findspark to your Spark installation directory
findspark.init('/opt/spark')  

# Use the Python interpreter from your current virtual environment
os.environ["PYSPARK_PYTHON"] = sys.executable


In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import udf, avg, count
from pyspark.sql.types import StringType

# Create Spark Session
spark = SparkSession.builder.appName("SentimentEngagementApp").getOrCreate()


25/05/21 09:01:32 WARN Utils: Your hostname, sandeep-VMware-Virtual-Platform resolves to a loopback address: 127.0.1.1; using 192.168.161.128 instead (on interface ens33)
25/05/21 09:01:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/21 09:01:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
input_path = "hdfs:///socialmedia/socialmedia.csv"  # Change to your HDFS path

df = spark.read.csv(input_path, header=True, inferSchema=True)
df.show(5, truncate=False)


                                                                                

+---+----------+----------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------+
|0  |1467810369|Mon Apr 06 22:19:45 PDT 2009|NO_QUERY|_TheSpecialOne_|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|
+---+----------+----------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------+
|0  |1467810672|Mon Apr 06 22:19:49 PDT 2009|NO_QUERY|scotthamilton  |is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!    |
|0  |1467810917|Mon Apr 06 22:19:53 PDT 2009|NO_QUERY|mattycus       |@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds                          |
|0  |1467811184|Mon Apr 06 22:19:57 PDT 2009|NO_QUERY|ElleCTF    

In [7]:
columns = ["sentiment","id","date","Query","User","text"]
df = df.toDF(*columns)

In [8]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

In [9]:
analyzer = SentimentIntensityAnalyzer()

def vader_sentiment(text):
    if text is None:
        return "neutral"
    scores = analyzer.polarity_scores(text)
    compound = scores['compound']
    if compound >= 0.05:
        return "positive"
    elif compound <= -0.05:
        return "negative"
    else:
        return "neutral"

sentiment_udf = udf(vader_sentiment, StringType())

In [10]:
df = df.withColumn("vader_sentiment", sentiment_udf(df["text"]))

df.select("user", "text", "vader_sentiment").show(10, truncate=False)

# User engagement: avg session_time is missing? Use count of queries/comments per user
user_engagement = df.groupBy("user").agg(
    count("query").alias("total_queries")
)

user_engagement.show(5)

                                                                                

+---------------+---------------------------------------------------------------------------------------------------------------+---------------+
|user           |text                                                                                                           |vader_sentiment|
+---------------+---------------------------------------------------------------------------------------------------------------+---------------+
|scotthamilton  |is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!|negative       |
|mattycus       |@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds                      |positive       |
|ElleCTF        |my whole body feels itchy and like its on fire                                                                 |negative       |
|Karoli         |@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over the

[Stage 4:>                                                          (0 + 4) / 4]

+---------------+-------------+
|           user|total_queries|
+---------------+-------------+
|     megan_rice|           15|
|        Daniiej|            3|
|         MeghTW|            1|
|   candicebunny|            1|
|stranger_danger|           14|
+---------------+-------------+
only showing top 5 rows



                                                                                

In [11]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

window = Window.partitionBy("user").orderBy(desc("count"))

sentiment_counts = df.groupBy("user", "vader_sentiment").count()
ranked = sentiment_counts.withColumn("rank", row_number().over(window))
majority_sentiment = ranked.filter(ranked.rank == 1).select("user", ranked.vader_sentiment.alias("majority_sentiment"))

# Join engagement and majority sentiment
final_df = user_engagement.join(majority_sentiment, on="user", how="left")

final_df.show(10)

[Stage 14:>                                                         (0 + 1) / 1]

+-------------+-------------+------------------+
|         user|total_queries|majority_sentiment|
+-------------+-------------+------------------+
|      Daniiej|            3|          negative|
|     J_Moneyy|            7|          positive|
|  Lilli_Allen|            1|           neutral|
|       MeghTW|            1|          negative|
|      SoEdith|            5|          negative|
|      caaaami|            1|          negative|
| candicebunny|            1|           neutral|
|   convoy3571|           13|          positive|
|divingkid2001|            1|           neutral|
|   megan_rice|           15|          positive|
+-------------+-------------+------------------+
only showing top 10 rows



                                                                                

In [12]:
print(analyzer.polarity_scores("This is a great day!"))

{'neg': 0.0, 'neu': 0.477, 'pos': 0.523, 'compound': 0.6588}


In [13]:
output_path = "hdfs:///user/sandeep/output/vader_sentiment_engagement"
final_df.write.csv(output_path, header=True, mode="overwrite")

print(f"Saved results to {output_path}")

spark.stop()

25/05/21 09:04:50 WARN DAGScheduler: Broadcasting large task binary with size 1098.5 KiB
                                                                                

Saved results to hdfs:///user/sandeep/output/vader_sentiment_engagement


In [21]:
print(spark.version)


3.4.1
