In [1]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, to_date
from pyspark.sql.types import FloatType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

In [3]:
# Check if Spark is running
sc

In [4]:
filename = '2016_US_election_tweets.csv'

# Read input file from hadoop directory and convert to pandas
df = spark.read.csv('/user1/CA2/'+filename,header=True)


In [5]:
df = df.select('candidate_id', "created_at", "tweet_text")

In [6]:
df.show()

+-------------------+-------------------+--------------------+
|       candidate_id|         created_at|          tweet_text|
+-------------------+-------------------+--------------------+
|                  1|2016-08-30 14:41:22|@zitto007 @Matthe...|
|                  1|2016-08-30 14:41:22|I think @HumaAbed...|
|                  1|2016-08-30 14:41:24|                null|
|                  1|2016-08-30 14:41:25|                null|
|                  1|2016-08-30 14:41:25|                null|
|                  1|2016-08-30 14:41:25|                null|
|                  1|2016-08-30 14:41:25|@HillaryClinton @...|
|                  1|2016-08-30 14:41:26|                null|
|                  1|2016-08-30 14:41:26|                null|
|                  3|2016-08-30 14:41:27|                null|
|                  1|2016-08-30 14:41:28|@HillaryClinton @...|
|                  1|2016-08-30 14:41:29|                null|
|                  3|2016-08-30 14:41:31|@BrinckJeff @P

In [7]:
df_no_nulls = df.na.drop()
tweets = df_no_nulls.filter(df.candidate_id == 2)

In [8]:
tweets.show(truncate=60)

+------------+-------------------+------------------------------------------------------------+
|candidate_id|         created_at|                                                  tweet_text|
+------------+-------------------+------------------------------------------------------------+
|           2|2017-02-17 08:46:52|@realDonaldTrump see even he saying the media and journal...|
|           2|2017-02-17 08:48:31|    @jackschofield @realDonaldTrump  https://t.co/d6xhiIE14B|
|           2|2017-02-17 08:48:50|MT @VoteTrumpPics: Thank you @realDonaldTrump for being a...|
|           2|2017-02-17 08:48:53|Accidentally stumbled upon a 'popping' vid, the stuff com...|
|           2|2017-02-17 08:49:05|@realDonaldTrump I hope your tax plan roll out is a plan ...|
|           2|2017-02-17 11:51:36|@realDonaldTrump where is your psych evaluation?  You nee...|
|           2|2017-02-17 11:51:38|@realDonaldTrump that's the point you're at? Rush Limbaug...|
|           2|2017-02-17 11:51:40|      

In [9]:
analyzer = SentimentIntensityAnalyzer()

def analyze_sentiment(text):
    return analyzer.polarity_scores(text)["compound"]

sentiment_udf = udf(analyze_sentiment, FloatType())
spark.udf.register("sentiment_udf", sentiment_udf)

<function __main__.analyze_sentiment(text)>

In [10]:
# Convert timestamp to date
tweets = tweets.withColumn("date", to_date(col("created_at")))

# Calculate sentiment score for each tweet
tweets = tweets.withColumn("sentiment", sentiment_udf(col("tweet_text")))


In [11]:
tweets.show(truncate=40)

[Stage 3:>                                                          (0 + 1) / 1]

+------------+-------------------+----------------------------------------+----------+---------+
|candidate_id|         created_at|                              tweet_text|      date|sentiment|
+------------+-------------------+----------------------------------------+----------+---------+
|           2|2017-02-17 08:46:52|@realDonaldTrump see even he saying t...|2017-02-17|  -0.2023|
|           2|2017-02-17 08:48:31|@jackschofield @realDonaldTrump  http...|2017-02-17|      0.0|
|           2|2017-02-17 08:48:50|MT @VoteTrumpPics: Thank you @realDon...|2017-02-17|   0.3612|
|           2|2017-02-17 08:48:53|Accidentally stumbled upon a 'popping...|2017-02-17|    -0.34|
|           2|2017-02-17 08:49:05|@realDonaldTrump I hope your tax plan...|2017-02-17|   0.2942|
|           2|2017-02-17 11:51:36|@realDonaldTrump where is your psych ...|2017-02-17|     0.34|
|           2|2017-02-17 11:51:38|@realDonaldTrump that's the point you...|2017-02-17|      0.0|
|           2|2017-02-17 11:51

                                                                                

In [12]:
type(tweets)

pyspark.sql.dataframe.DataFrame

In [13]:
tweets.count()

                                                                                

41901010

In [14]:
tweets = tweets.select('tweet_text','date','sentiment')

In [15]:
tweets.show(truncate = 80)

[Stage 7:>                                                          (0 + 1) / 1]

+--------------------------------------------------------------------------------+----------+---------+
|                                                                      tweet_text|      date|sentiment|
+--------------------------------------------------------------------------------+----------+---------+
|@realDonaldTrump see even he saying the media and journalist lie about shit a...|2017-02-17|  -0.2023|
|                        @jackschofield @realDonaldTrump  https://t.co/d6xhiIE14B|2017-02-17|      0.0|
|MT @VoteTrumpPics: Thank you @realDonaldTrump for being a man of your word. h...|2017-02-17|   0.3612|
|Accidentally stumbled upon a 'popping' vid, the stuff coming out of the pimpl...|2017-02-17|    -0.34|
|@realDonaldTrump I hope your tax plan roll out is a plan to roll out your tax...|2017-02-17|   0.2942|
|@realDonaldTrump where is your psych evaluation?  You need serious profession...|2017-02-17|     0.34|
|         @realDonaldTrump that's the point you're at? Rush Limb

                                                                                

In [16]:
tweets_pd = tweets.toPandas()

2023-05-21 15:49:41,445 ERROR scheduler.TaskSetManager: Total size of serialized results of 28 tasks (1047.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
2023-05-21 15:49:41,897 WARN scheduler.TaskSetManager: Lost task 26.0 in stage 8.0 (TID 138) (10.0.2.15 executor driver): TaskKilled (Tasks result size has exceeded maxResultSize)
  An error occurred while calling o97.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	a

Py4JJavaError: An error occurred while calling o97.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
	at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 28 tasks (1047.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2309)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$5(Dataset.scala:3648)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3652)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3629)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1(Dataset.scala:3629)
	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1$adapted(Dataset.scala:3628)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:139)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:141)
	at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:136)
	at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:113)
	at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:107)
	at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$4(SocketAuthServer.scala:68)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:68)


In [16]:
import pyspark.sql.functions as F
import matplotlib.pyplot as plt

In [17]:
daily_sentiment_df = tweets.groupBy("date") \
    .agg(F.mean("sentiment").alias("avg_sentiment")) \
    .orderBy("date")

In [20]:
tweets.unpersist()
daily_sentiment_df.unpersist()

DataFrame[date: date, avg_sentiment: double]

In [None]:
daily_sentiment_df.count()



In [None]:
daily_sentiment_pd = daily_sentiment_df.toPandas()

In [21]:
daily_sentiment_df.show()

ERROR:root:KeyboardInterrupt while sending command.               (4 + 4) / 106]
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 