In [1]:
#configure envirnoment and install pyspark
!pip install pyspark
%matplotlib inline
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
import pandas as pd



In [2]:
import pandas as pd
#reading in our tweets scraped before election into pandas dataframe
before_df = pd.read_csv("tweets.csv")

In [3]:
#reading in our tweets scraped after election into pandas dataframe
after_df = pd.read_csv("tweet_after.csv")

In [4]:
# Converting spark to pandas data frame
from pyspark.sql.types import *
# mySchema=StructType([StructField("Country", StringType())\
#                       ,StructField("Org", StringType())])

mySchema = StructType([StructField("user", StringType())\
                      ,StructField("id", FloatType())\
                      ,StructField("location", StringType())\
                      ,StructField("date", StringType())\
                      ,StructField("favourites_count", StringType())\
                      ,StructField("text", StringType())\
                      ,StructField("retweet", StringType())])
#before
df_before = spark.createDataFrame(before_df,schema=mySchema)
#after
df_after = spark.createDataFrame(after_df,schema=mySchema)
# drop columns
df_before = df_before.drop('id', 'date')
# change the data type
df_before = df_before.withColumn("favourites_count", df_before['favourites_count'].cast(IntegerType()))
# # drop rows which tweet is retweet
df_before = df_before.filter(df_before['retweet'] == 'false')

# drop columns
df_after = df_after.drop('id', 'date')
# change the data type
df_after = df_after.withColumn("favourites_count", df_after['favourites_count'].cast(IntegerType()))
# # drop rows which tweet is retweet
df_after = df_after.filter(df_after['retweet'] == 'false')

In [5]:
# check if there are duplicate rows
if df_before.distinct().count() != df_before.count():
  print('There are duplicate rows.')
else:
  print('No duplicated.')
if df_after.distinct().count() != df_after.count():
  print('There are duplicate rows.')
else:
  print('No duplicated.')

There are duplicate rows.
There are duplicate rows.


In [6]:
# remove duplicate rows
df_before = df_before.dropDuplicates()
df_before.count()

# remove duplicate rows
df_after = df_after.dropDuplicates()
df_after.count()

15860

In [7]:
# extract the hashtage
import re
#extracting hashtags as a means of possible cluster validation  
def extraxt_hashtags(text):
  hashtags = re.findall(r'\B#\w*[a-zA-Z]+\w*', text)
  return hashtags

from pyspark.sql.functions import udf, expr, concat, col
from pyspark.sql import types
func_hash = udf(extraxt_hashtags, types.StringType())
df_before = df_before.withColumn("hashtage", func_hash("text"))

# clean tweet
from string import punctuation
def clean_tweet(tweet):
    # Remove hyperlinks
    tweet = re.sub("https?://[A-Za-z0-9./]*","",tweet)
    # Remove hashtags
    tweet = re.sub(r'#\w*', '', tweet)
    # Remove tickers
    tweet = re.sub(r'\$\w*', '', tweet)
    #@user -> at_user
    tweet = re.sub("@[\w]*","",tweet)
    # To lowercase
    tweet = tweet.lower()
    # Remove Punctuation and split 's, 't, 've with a space for filter
    tweet = re.sub(r'[' + punctuation.replace('@', '') + ']+', ' ', tweet)
    # Remove words: I, a , am, me (2 or less letters)
    tweet = re.sub(r'\b\w{1,2}\b', '', tweet)
    # Remove whitespace (including new line characters)
    tweet = re.sub(r'\s\s+', ' ', tweet)
    # Remove single space remaining at the front of the tweet.
    tweet = tweet.lstrip(' ') 
    # Remove emojis or other. special characters
    tweet = ''.join(c for c in tweet if c <= '\uFFFF') 
    return tweet  

func_clean = udf(clean_tweet, types.StringType())
df_before = df_before.withColumn("clean_text", func_clean("text"))

In [8]:
func_clean = udf(clean_tweet, types.StringType())
df_after = df_after.withColumn("clean_text", func_clean("text"))

In [9]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import IDF
import requests
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer()\
  .setInputCol("clean_text")\
  .setOutputCol("words")

# stopwords object
# we obtain the stop words from a website
stop_words = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words').text.split()
stop_words[0:10]
sw_filter = StopWordsRemover()\
\
  .setStopWords(stop_words)\
  .setCaseSensitive(False)\
  .setInputCol("words")\
  .setOutputCol("filtered")

# we will remove words that appear in 5 docs or less
cv = CountVectorizer()\
  .setInputCol("filtered")\
  .setOutputCol("tf")

# TF-IDF object
idf = IDF().\
    setInputCol('tf').\
    setOutputCol('tfidf')

# build the pipeline
pipe1 = Pipeline(stages=[tokenizer, sw_filter, cv]).fit(df_before)
tweets_pipe = Pipeline(stages=[pipe1, idf]).fit(df_before)

In [10]:
# build the pipeline
pipe2 = Pipeline(stages=[tokenizer, sw_filter, cv]).fit(df_after)
tweets_pipe_after = Pipeline(stages=[pipe1, idf]).fit(df_after)

In [11]:
tweets_before = tweets_pipe.transform(df_before)

In [12]:
tweets_after = tweets_pipe_after.transform(df_after)

In [13]:
from pyspark.sql.types import *

In [14]:
tweets_after.show()

+---------------+-----------------------+----------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|           user|               location|favourites_count|                text|retweet|          clean_text|               words|            filtered|                  tf|               tfidf|
+---------------+-----------------------+----------------+--------------------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|nicholasmcardle|       White House -TBC|             418|@realDonaldTrump ...|  false|trump english can...|[trump, english, ...|[trump, english, ...|(20493,[1,8,9,11,...|(20493,[1,8,9,11,...|
|     seanieinaz|                    NaN|              38|@realDonaldTrump ...|  false|can donald trump ...|[can, donald, tru...|[donald, trump, t...|(20493,[1,11,52,6...|(20493,[1,11,52,6...|
|    packard1963|         Northumbe

In [15]:
#reading in sentiment parquet to label overall sentiment of each tweet
sentiment = spark.read.parquet('sentiments.parquet')
from pyspark.sql import functions as fn
tweets_before.select('user', fn.explode('words').alias('word1')).show()

+--------------+------------+
|          user|       word1|
+--------------+------------+
|   TerinaKelso|         say|
|   TerinaKelso|       again|
|   TerinaKelso|         for|
|   TerinaKelso|         the|
|   TerinaKelso|      people|
|   TerinaKelso|         the|
|   TerinaKelso|        back|
|   TerinaKelso|         row|
|BadTasteMedia7|  appreciate|
|BadTasteMedia7|christianity|
|BadTasteMedia7|         for|
|BadTasteMedia7|      giving|
|BadTasteMedia7|         our|
|BadTasteMedia7|      morals|
|BadTasteMedia7|         and|
|BadTasteMedia7|         the|
|BadTasteMedia7|      gospel|
|BadTasteMedia7|         for|
|BadTasteMedia7|      giving|
|BadTasteMedia7|        rock|
+--------------+------------+
only showing top 20 rows



In [16]:
new_df = tweets_before.\
    select('user', 'text', fn.explode('words').alias('word')).\
    join(sentiment, 'word')
new_df.show(5)
before_labeled = new_df.\
    groupBy('text').\
    agg(fn.avg('sentiment').alias('avg_sentiment')).\
    withColumn('predicted', fn.when(fn.col('avg_sentiment') > 0, 1.0).when(fn.col('avg_sentiment')<0, -1.0).otherwise(0.))
before_labeled.show(5)

+----------+---------------+--------------------+---------+
|      word|           user|                text|sentiment|
+----------+---------------+--------------------+---------+
|appreciate| BadTasteMedia7|I appreciate Chri...|        1|
|      fuck| BadTasteMedia7|I appreciate Chri...|       -1|
|      like| BadTasteMedia7|I appreciate Chri...|        1|
|      fall|Horizonshealth2|FOODS THAT MAKE Y...|       -1|
|     trump|  maskshirt2021|Without a mask he...|        1|
+----------+---------------+--------------------+---------+
only showing top 5 rows

+--------------------+-------------+---------+
|                text|avg_sentiment|predicted|
+--------------------+-------------+---------+
|#Twitter suspends...|         -1.0|     -1.0|
|If you are follow...|         -1.0|     -1.0|
|All jokes aside.....|         -1.0|     -1.0|
|Hollywood persona...|         -1.0|     -1.0|
|The left and Demo...|          0.0|      0.0|
+--------------------+-------------+---------+
only showing

In [20]:
after_df = tweets_after.\
    select('user', 'text', fn.explode('words').alias('word')).\
    join(sentiment, 'word')
after_labeled = after_df.\
    groupBy('text').\
    agg(fn.avg('sentiment').alias('avg_sentiment')).\
    withColumn('predicted', fn.when(fn.col('avg_sentiment') > 0, 1.0).when(fn.col('avg_sentiment')<0, -1.0).otherwise(0.))
after_labeled.show(5)

+--------------------+-------------------+---------+
|                text|      avg_sentiment|predicted|
+--------------------+-------------------+---------+
|#DonaldTrump what...|               -1.0|     -1.0|
|Biden might be ab...|                1.0|      1.0|
|Yo, #JoeBiden nee...|               -1.0|     -1.0|
|These mass gather...| 0.3333333333333333|      1.0|
|'Most secure in h...|-0.3333333333333333|     -1.0|
+--------------------+-------------------+---------+
only showing top 5 rows



In [24]:
final_after = tweets_after.join(after_labeled, on ='text',how='left')
final_after.show()

+--------------------+---------------+--------------------+----------------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------+
|                text|           user|            location|favourites_count|retweet|          clean_text|               words|            filtered|                  tf|               tfidf|      avg_sentiment|predicted|
+--------------------+---------------+--------------------+----------------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------+
|#Biden staffer wa...|    redbird1031|                 NaN|           10832|  false|staffer was arres...|[staffer, was, ar...|[staffer, arreste...|(20493,[25,68,97,...|(20493,[25,68,97,...|               -1.0|     -1.0|
|#China sends '#Co...| digitaljournal|              Global|           24706|  false|              sends |             [s

In [25]:
final_before = tweets_before.join(before_labeled, on ='text',how='left')
final_before.show()

+--------------------+---------------+--------------------+----------------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------+
|                text|           user|            location|favourites_count|retweet|            hashtage|          clean_text|               words|            filtered|                  tf|               tfidf|      avg_sentiment|predicted|
+--------------------+---------------+--------------------+----------------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------+
|"#POTUS News: #Tr...| robinsnewswire|RT's Are FYI Purp...|             544|  false|[#POTUS, #Trump, ...|news says not hap...|[news, says, not,...|[news, says, happ...|(20493,[44,62,459...|(20493,[44,62,459...|                1.0|      1.0|
|"When somebody is...|RightWingQuote

In [49]:
from pyspark.sql import functions as F
final_before.filter(F.isnull("predicted")).show()

+--------------------+---------------+--------------------+----------------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+---------+
|                text|           user|            location|favourites_count|retweet|            hashtage|          clean_text|               words|            filtered|                  tf|               tfidf|avg_sentiment|predicted|
+--------------------+---------------+--------------------+----------------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+---------+
|#Election2020 Acc...|   MadisonNBCMT|        Missoula, MT|            2704|  false|[#Election2020, #...|according the sec...|[according, the, ...|[according, secre...|(20493,[38,60,65,...|(20493,[38,60,65,...|         null|     null|
|#KamalaHarris usd...|      evers_oak|United States of ...| 

In [53]:
final_before = final_before.na.fill(0,("predicted"))

In [55]:
final_after = final_after.na.fill(0,("predicted"))

In [54]:
final_before.show()

+--------------------+---------------+--------------------+----------------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------+
|                text|           user|            location|favourites_count|retweet|            hashtage|          clean_text|               words|            filtered|                  tf|               tfidf|      avg_sentiment|predicted|
+--------------------+---------------+--------------------+----------------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------+
|"#POTUS News: #Tr...| robinsnewswire|RT's Are FYI Purp...|             544|  false|[#POTUS, #Trump, ...|news says not hap...|[news, says, not,...|[news, says, happ...|(20493,[44,62,459...|(20493,[44,62,459...|                1.0|      1.0|
|"When somebody is...|RightWingQuote

In [30]:
after_df = pd.read_csv("tweet_after.csv")
before_df = pd.read_csv("tweets.csv")

#Labeling With TextBlob

In [31]:
# clean tweet
from string import punctuation
def clean_tweet(tweet):
    # Remove hyperlinks
    tweet = re.sub("https?://[A-Za-z0-9./]*","",tweet)
    # Remove hashtags
    tweet = re.sub(r'#\w*', '', tweet)
    # Remove tickers
    tweet = re.sub(r'\$\w*', '', tweet)
    #@user -> at_user
    tweet = re.sub("@[\w]*","",tweet)
    # To lowercase
    tweet = tweet.lower()
    # Remove Punctuation and split 's, 't, 've with a space for filter
    tweet = re.sub(r'[' + punctuation.replace('@', '') + ']+', ' ', tweet)
    # Remove words: I, a , am, me (2 or less letters)
    tweet = re.sub(r'\b\w{1,2}\b', '', tweet)
    # Remove whitespace (including new line characters)
    tweet = re.sub(r'\s\s+', ' ', tweet)
    # Remove single space remaining at the front of the tweet.
    tweet = tweet.lstrip(' ') 
    # Remove emojis or other. special characters
    tweet = ''.join(c for c in tweet if c <= '\uFFFF') 
    return tweet  


In [32]:
after_df.dtypes

user                 object
id                  float64
location             object
date                 object
favourites_count    float64
text                 object
retweet              object
dtype: object

In [33]:
after_df['text'] = after_df.text.apply(str)

In [34]:
after_df['clean_text'] = after_df['text'].apply(lambda x: clean_tweet(x))
after_df.head()

Unnamed: 0,user,id,location,date,favourites_count,text,retweet,clean_text
0,Hskers62,9.08127e+17,United States,9/14/17,197954.0,Donald Trump is the only president in history ...,True,donald trump the only president history have b...
1,JMZJEWELRY,8.82603e+17,Here and there,7/5/17,3652.0,Joe Biden got 306 Electoral College votes by w...,True,joe biden got 306 electoral college votes winn...
2,lrogers66821,1.00439e+18,"Cincinnati, OH",6/6/18,369957.0,"Deutsche Bank, which funded Auschwitz, Donald ...",True,deutsche bank which funded auschwitz donald tr...
3,LizNYC66,8.92739e+17,"Manhattan, NY",8/2/17,33175.0,The Biden administration is sure to come under...,True,the biden administration sure come under press...
4,thia89837769,1.21457e+18,"Ottawa, Ontario",1/7/20,5580.0,Donald Trump is the only president in history ...,True,donald trump the only president history have b...


In [35]:
before_df['text'] = before_df.text.apply(str)
before_df['clean_text'] = before_df['text'].apply(lambda x: clean_tweet(x))
before_df.head()

Unnamed: 0,user,id,location,date,favourites_count,text,retweet,clean_text
0,PatsFan876,2999235000.0,,42030,359685,Busting stereotypes here... I'm a truck drivin...,True,busting stereotypes here truck driving country...
1,Yasu_Al_Masih,8.83e+17,Nazareth,42923,15736,I COULDN'T RESIST POSTING THIS ONE!🤣😂🤣😂🤣🤣🤣 🤦‍♀...,True,couldn resist posting this one ‍♀️‍♀️‍♀️ enjo...
2,JBK11,15036270.0,Canada,39606,43347,Busting stereotypes here... I'm a truck drivin...,True,busting stereotypes here truck driving country...
3,SoyElG,32098350.0,,39919,432,The dismissive hand wave is what’s most disres...,False,the dismissive hand wave what’ most disrespect...
4,cynthianatalie,21103940.0,Los Angeles,39861,4952,"Inside #DonaldTrump's ""serial bad behavior"" at...",True,inside serial bad behavior nbc


In [36]:
#after_df.head()
after_df['clean_text'] = after_df.text.apply(str)

In [37]:
before_df['clean_text'] = before_df.text.apply(str)

In [38]:
from textblob import TextBlob
def get_tweet_sentiment(tweet): 
    '''Utility function to classify sentiment of passed tweet using textblob's sentiment method'''
    # create TextBlob object of passed tweet text 
    analysis = TextBlob(tweet) 
    # set sentiment 
    if analysis.sentiment.polarity > 0:
        return '1'
    elif analysis.sentiment.polarity == 0: 
        return '0'
    else: 
        return '-1'

In [39]:
before_df['tb_label'] = before_df['clean_text'].apply(lambda x: get_tweet_sentiment(' '.join(x)))

In [40]:
after_df['tb_label']= after_df['clean_text'].apply(lambda x: get_tweet_sentiment(' '.join(x)))

In [43]:
before_df['tb_label'] = before_df.tb_label.apply(int)
after_df['tb_label'] = after_df.tb_label.apply(int)

In [46]:
before_df.head()

Unnamed: 0,user,id,location,date,favourites_count,text,retweet,clean_text,tb_label
0,PatsFan876,2999235000.0,,42030,359685,Busting stereotypes here... I'm a truck drivin...,True,Busting stereotypes here... I'm a truck drivin...,0
1,Yasu_Al_Masih,8.83e+17,Nazareth,42923,15736,I COULDN'T RESIST POSTING THIS ONE!🤣😂🤣😂🤣🤣🤣 🤦‍♀...,True,I COULDN'T RESIST POSTING THIS ONE!🤣😂🤣😂🤣🤣🤣 🤦‍♀...,-1
2,JBK11,15036270.0,Canada,39606,43347,Busting stereotypes here... I'm a truck drivin...,True,Busting stereotypes here... I'm a truck drivin...,0
3,SoyElG,32098350.0,,39919,432,The dismissive hand wave is what’s most disres...,False,The dismissive hand wave is what’s most disres...,-1
4,cynthianatalie,21103940.0,Los Angeles,39861,4952,"Inside #DonaldTrump's ""serial bad behavior"" at...",True,"Inside #DonaldTrump's ""serial bad behavior"" at...",-1


In [47]:
after_df.head()

Unnamed: 0,user,id,location,date,favourites_count,text,retweet,clean_text,tb_label
0,Hskers62,9.08127e+17,United States,9/14/17,197954.0,Donald Trump is the only president in history ...,True,Donald Trump is the only president in history ...,0
1,JMZJEWELRY,8.82603e+17,Here and there,7/5/17,3652.0,Joe Biden got 306 Electoral College votes by w...,True,Joe Biden got 306 Electoral College votes by w...,0
2,lrogers66821,1.00439e+18,"Cincinnati, OH",6/6/18,369957.0,"Deutsche Bank, which funded Auschwitz, Donald ...",True,"Deutsche Bank, which funded Auschwitz, Donald ...",-1
3,LizNYC66,8.92739e+17,"Manhattan, NY",8/2/17,33175.0,The Biden administration is sure to come under...,True,The Biden administration is sure to come under...,-1
4,thia89837769,1.21457e+18,"Ottawa, Ontario",1/7/20,5580.0,Donald Trump is the only president in history ...,True,Donald Trump is the only president in history ...,0
