In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml import Pipeline
from pyspark.sql.types import StringType

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
tweets = pd.read_excel("Tweets.xlsx", sheet_name = "Tweets")
dictionary = pd.read_excel("Tweets.xlsx", sheet_name = "Dictionary")

In [3]:
tweets.head()

Unnamed: 0,TweetID,UnixTimeStamp,TimeStamp,TwitterUserID,Tweet
0,4007366899589130240,1245849000.0,Wed Jun 24 09:15:29 +0000 2009,benarreau,Really enjoying my jewelry purchase from @fudg...
1,1917240238549289984,1354665000.0,Tue Dec 04 18:46:22 +0000 2012,benarreau,Fudgemart's digital downloads: paltry!
2,2764278507764550144,1248408000.0,Fri Jul 24 00:00:00 +0000 2009,benarreau,Who uses the fudgemart website
3,2737642715545119744,1278820000.0,Sat Jul 10 23:51:32 +0000 2010,benarreau,#fudgemart customer service is predictable
4,846963692469470976,1324767000.0,Sat Dec 24 17:43:07 +0000 2011,benarreau,#fudgemart customer service is predictable


In [23]:
tweets.dtypes

TweetID            int64
UnixTimeStamp    float64
TimeStamp         object
TwitterUserID     object
Tweet             object
dtype: object

In [4]:
tweets.shape

(625, 5)

In [5]:
dictionary.head()

Unnamed: 0,type,length,word,part_of_speech,stemmed,polarity
0,strongsubj,1,bestial,adj,n,negative
1,strongsubj,1,bewildered,adj,n,negative
2,strongsubj,1,bewildering,adj,n,negative
3,strongsubj,1,bias,adj,n,negative
4,strongsubj,1,biased,adj,n,negative


In [6]:
dictionary.shape

(8210, 6)

In [7]:
dictionary.loc[dictionary.polarity == 'both', 'polarity'] = 'neutral'
dictionary["score"] = np.select([((dictionary.type == 'strongsubj') & (dictionary.polarity == 'negative')),
                              ((dictionary.type == 'weaksubj') & (dictionary.polarity == 'negative')),
                              dictionary.polarity == 'neutral',
                              ((dictionary.type == 'strongsubj') & (dictionary.polarity == 'positive')),
                              ((dictionary.type == 'weaksubj') & (dictionary.polarity == 'positive'))], 
                        [1,2,3,4,5])

In [8]:
dictionary.head()

Unnamed: 0,type,length,word,part_of_speech,stemmed,polarity,score
0,strongsubj,1,bestial,adj,n,negative,1
1,strongsubj,1,bewildered,adj,n,negative,1
2,strongsubj,1,bewildering,adj,n,negative,1
3,strongsubj,1,bias,adj,n,negative,1
4,strongsubj,1,biased,adj,n,negative,1


In [9]:
tweets_sdf = spark.createDataFrame(tweets)
dictionary_sdf = spark.createDataFrame(dictionary).select(fn.col("word").alias('words'), "score")

In [10]:
print("tweets_sdf: ",tweets_sdf.count())
print("dictionary_sdf: ", dictionary_sdf.count())

tweets_sdf:  625
dictionary_sdf:  8210


In [11]:
from pyspark.ml.feature import StopWordsRemover

stop_words = StopWordsRemover.loadDefaultStopWords("english")
stop_words.append("fudgemart")

sw_filter = StopWordsRemover()\
  .setStopWords(stop_words)\
  .setCaseSensitive(False)\
  .setInputCol("words")\
  .setOutputCol("words_filtered")

tokenizer = RegexTokenizer().setGaps(False)\
  .setPattern("\\p{L}+")\
  .setInputCol("Tweet")\
  .setOutputCol("words")

pipeline = Pipeline(stages = [tokenizer,sw_filter]).fit(tweets_sdf)

In [12]:
tweets_df = pipeline.transform(tweets_sdf)
print("Count: ", tweets_df.count())
tweets_df.show(5)

Count:  625
+-------------------+------------------+--------------------+-------------+--------------------+--------------------+--------------------+
|            TweetID|     UnixTimeStamp|           TimeStamp|TwitterUserID|               Tweet|               words|      words_filtered|
+-------------------+------------------+--------------------+-------------+--------------------+--------------------+--------------------+
|4007366899589130240|1.24584932929029E9|Wed Jun 24 09:15:...|    benarreau|Really enjoying m...|[really, enjoying...|[really, enjoying...|
|1917240238549289984|1.35466478238322E9|Tue Dec 04 18:46:...|    benarreau|Fudgemart's digit...|[fudgemart, s, di...|[digital, downloa...|
|2764278507764550144|1.24840800014412E9|Fri Jul 24 00:00:...|    benarreau|Who uses the fudg...|[who, uses, the, ...|     [uses, website]|
|2737642715545119744| 1.2788202920058E9|Sat Jul 10 23:51:...|    benarreau|#fudgemart custom...|[fudgemart, custo...|[customer, servic...|
| 8469636924694

In [13]:
tweets_sentiment = tweets_df.select('TweetID', fn.explode('words_filtered').\
                                    alias('words')).join(dictionary_sdf, on= 'words', how= "left").\
na.fill(3)
tweets_sentiment.show(5)

+-------+-------------------+-----+
|  words|            TweetID|score|
+-------+-------------------+-----+
|jewelry|4007366899589130240|    3|
|jewelry|4230207123119609856|    3|
|jewelry|1576833952510230016|    3|
|jewelry|1083354024376899968|    3|
|jewelry|2546901730034719744|    3|
+-------+-------------------+-----+
only showing top 5 rows



In [14]:
tweets_sentiment.count()

2479

In [15]:
tweets_review = tweets_sentiment.groupBy('TweetID').agg(fn.avg('score').alias('avg_sentiment'))

In [16]:
tweets_review.show(5)

+-------------------+-------------+
|            TweetID|avg_sentiment|
+-------------------+-------------+
| 629034406809296000|          3.0|
|2393463427634800128|          3.0|
| 495446208693884032|          3.5|
| 700193951276887040|          3.5|
|4091249120870569984|          3.0|
+-------------------+-------------+
only showing top 5 rows



In [17]:
tweets_review.count()

613

In [18]:
final = tweets_review.join(tweets_sdf, 'TweetID')

In [19]:
final.show(5)

+-------------------+------------------+------------------+--------------------+-------------+--------------------+
|            TweetID|     avg_sentiment|     UnixTimeStamp|           TimeStamp|TwitterUserID|               Tweet|
+-------------------+------------------+------------------+--------------------+-------------+--------------------+
| 495446208693884032|               3.5|1.28479416422551E9|Sat Sep 18 03:16:...|      rdeboat|Happy with my ser...|
| 629034406809296000|               3.0|1.24311618973726E9|Sat May 23 18:03:...|     edetyers|There are new dig...|
|2393463427634800128|               3.0|1.33621547422489E9|Sat May 05 06:57:...|     gtofwind|Hey fudgemart, wh...|
| 700193951276887040|               3.5|1.32781623785616E9|Sun Jan 29 00:50:...| afirenzergon|Just got some ele...|
| 757546958643576064|3.3333333333333335|1.23320471069789E9|Wed Jan 28 23:51:...|   etasomthin|Glad I bought my ...|
+-------------------+------------------+------------------+-------------

In [20]:
final_pd = final.toPandas()
final_pd["TweetID"] = final_pd["TweetID"].astype(str)
final_pd.head()

Unnamed: 0,TweetID,avg_sentiment,UnixTimeStamp,TimeStamp,TwitterUserID,Tweet
0,495446208693884032,3.5,1284794000.0,Sat Sep 18 03:16:04 +0000 2010,rdeboat,Happy with my service from @fudgemart
1,629034406809296000,3.0,1243116000.0,Sat May 23 18:03:09 +0000 2009,edetyers,There are new digital downloads on #fudgemart
2,2393463427634800128,3.0,1336215000.0,Sat May 05 06:57:54 +0000 2012,gtofwind,"Hey fudgemart, why is your support so bad? #upset"
3,700193951276887040,3.5,1327816000.0,Sun Jan 29 00:50:37 +0000 2012,afirenzergon,Just got some electronics from #fudgemart. Awe...
4,757546958643576064,3.333333,1233205000.0,Wed Jan 28 23:51:50 +0000 2009,etasomthin,Glad I bought my books from @fudgemart


In [25]:
final_pd.to_csv('analysis.csv',index = False)