In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("FinalCoursework") \
    .getOrCreate()

sc = spark.sparkContext

In [3]:
dir_path="/mnt/data/project-data/2020tweets"

df_trump = spark.read.option("multiline", True).csv("data/hashtag_donaldtrump.csv", header=True, inferSchema=True)
df_biden = spark.read.option("multiline", True).csv("data/hashtag_joebiden.csv", header=True, inferSchema=True)

In [4]:
def transer_data_type(data):
  data = data.withColumn("likes", data["likes"].cast("int"))
  data = data.withColumn("retweet_count", data["retweet_count"].cast("int"))
  data = data.withColumn("user_followers_count", data["user_followers_count"].cast("int"))
  return data

df_trump = transer_data_type(df_trump)
df_biden = transer_data_type(df_biden)

In [5]:
#Dropping columns
remove_columns = ['created_at', 'tweet_id', 'user_id', 'user_screen_name', 'user_join_date', 'collected_at', 'user_name', 'user_description', 'user_location', 'lat', 'long', 'city', 'country', 'continent', 'state', 'state_code']
df_trump = df_trump.drop(*remove_columns)
df_biden = df_biden.drop(*remove_columns)

#Dropping rows with na values
df_trump = df_trump.na.drop()
df_biden = df_biden.na.drop()

In [6]:
#import matplotlib.pyplot as plt

# Create a bar plot
#plt.bar(['Trump', 'Biden'], [df_trump.count(), df_biden.count()])
#plt.xlabel('Candidate')
#plt.ylabel('Number of Tweets')
#plt.title('Number of Tweets for Trump and Biden')
#plt.show()

In [7]:
from pyspark.sql.functions import lit

#Adding a presidents column where 0 represents trump and 1 represents biden. This will turn it into a classification problem
df_trump = df_trump.withColumn('President', lit(0))
df_biden = df_biden.withColumn('President', lit(1))

#Merging the 2 datasets
df = df_trump.union(df_biden)

In [8]:
print(df.rdd.getNumPartitions())

2


In [9]:
df.show(5)

+--------------------+-----+-------------+------------------+--------------------+---------+
|               tweet|likes|retweet_count|            source|user_followers_count|President|
+--------------------+-----+-------------+------------------+--------------------+---------+
|#Elecciones2020 |...|    0|            0|         TweetDeck|                1860|        0|
|Usa 2020, Trump c...|   26|            9|  Social Mediaset |             1067661|        0|
|#Trump: As a stud...|    2|            1|   Twitter Web App|                1185|        0|
|2 hours since las...|    0|            0|     Trumpytweeter|                  32|        0|
|You get a tie! An...|    4|            3|Twitter for iPhone|                5393|        0|
+--------------------+-----+-------------+------------------+--------------------+---------+
only showing top 5 rows



In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from collections import Counter
import re

#Extracts all the hastags from the tweet and stores them in a list
@udf(returnType=ArrayType(StringType()))
def getHashtagList(tweet):
  tweet = tweet.lower()
  tweet = re.sub(r'[^\w\s#]', '', tweet)
  hashtag_list = []

  for word in tweet.split():
    if word[0] == '#':
      hashtag_list.append(word)
  
  #Creates a dictionary where the hashtags are the key and their frequency in the list is the value
  count_hashtags = Counter(hashtag_list)

  #Remove duplicate hashtags
  unique_hashtags = [hashtag for hashtag, frequency in count_hashtags.items() if frequency == 1]

  return hashtag_list

#Creates a column called hashtags which stores a hastag list for all the tweets
df = df.withColumn('hashtags', getHashtagList(df['tweet']))

In [11]:
df.show(5)

+--------------------+-----+-------------+------------------+--------------------+---------+--------------------+
|               tweet|likes|retweet_count|            source|user_followers_count|President|            hashtags|
+--------------------+-----+-------------+------------------+--------------------+---------+--------------------+
|#Elecciones2020 |...|    0|            0|         TweetDeck|                1860|        0|[#elecciones2020,...|
|Usa 2020, Trump c...|   26|            9|  Social Mediaset |             1067661|        0|      [#donaldtrump]|
|#Trump: As a stud...|    2|            1|   Twitter Web App|                1185|        0|            [#trump]|
|2 hours since las...|    0|            0|     Trumpytweeter|                  32|        0|            [#trump]|
|You get a tie! An...|    4|            3|Twitter for iPhone|                5393|        0|     [#trump, #iowa]|
+--------------------+-----+-------------+------------------+--------------------+------

In [12]:
from pyspark.sql import functions as F

#Splits the hashtag list and displays each hashtag in its own row in column hashtag
df_exploded = df.withColumn('hashtag', F.explode(df['hashtags']))

#Creates a new column called count which displays how many rows the hashtag appears in
#The counts will be used in TF-IDF calculation
df_count = df_exploded.groupBy("hashtag").count()

#Renaming count to hashtag_count
df_count = df_count.withColumnRenamed('count', 'hashtag_count')

#Joining df_count with df_exploded on hashtag
df_exploded = df_exploded.join(df_count, on="hashtag", how="left")

# Group by original columns and collect list of counts
df = df_exploded.groupBy(df.columns).agg(F.collect_list("hashtag_count").alias("hashtag_counts"))

In [13]:
df.show(5)

+--------------------+-----+-------------+-------------------+--------------------+---------+--------------------+--------------------+
|               tweet|likes|retweet_count|             source|user_followers_count|President|            hashtags|      hashtag_counts|
+--------------------+-----+-------------+-------------------+--------------------+---------+--------------------+--------------------+
| He’s nothing mor...|    0|            0|    Twitter Web App|                  71|        0|            [#trump]|            [968674]|
| it shows that #t...|    1|            0|Twitter for Android|                  11|        0|            [#trump]|            [968674]|
| media-sourced po...|    0|            0|Twitter for Android|                 657|        0|            [#biden]|            [631862]|
| mientras hace ca...|    0|            0| Twitter for iPhone|                  14|        1|[#electionday, #d...|[631862, 78352, 6...|
| so Please get a ...|    0|            0|Twitte

In [14]:
#Total number of rows
#This value is used to calculate TF-IDF
df_count = df.count()
print(df_count)

1643761


In [15]:
from pyspark.sql.types import FloatType
import math

#Creates the TF_IDF list column
@udf(returnType=FloatType())
def getAvg_TF_IDF(hashtags, hashtag_counts, df_count):
  avg_TF_IDF = 0
  count = 0

  #If there are no hashtags the tweet will be given a value of 0
  if len(hashtags) == 0:
    return 0.0

  for hashtag in hashtags:
    #Calculating TF
    #Since all the hastags are unique 1 is being divided by the total number of hastags
    TF = 1 / len(hashtags)
    #nDocs is the number of rows/tweets the hastag is used
    nDocs = hashtag_counts[count]
    #Calculating IDF
    #df_count is the total number of rows/tweets
    IDF = math.log(df_count / nDocs)
    TF_IDF = TF * IDF
    avg_TF_IDF += TF_IDF
    count += 1

  avg_TF_IDF /= count
  return avg_TF_IDF

# Apply the UDF to your DataFrame
df = df.withColumn('TF_IDF', getAvg_TF_IDF(df['hashtags'], df['hashtag_counts'], lit(df_count)))

In [16]:
df.show(5)

+--------------------+-----+-------------+-------------------+--------------------+---------+--------------------+--------------------+---------+
|               tweet|likes|retweet_count|             source|user_followers_count|President|            hashtags|      hashtag_counts|   TF_IDF|
+--------------------+-----+-------------+-------------------+--------------------+---------+--------------------+--------------------+---------+
| He’s nothing mor...|    0|            0|    Twitter Web App|                  71|        0|            [#trump]|            [968674]|0.5288141|
| it shows that #t...|    1|            0|Twitter for Android|                  11|        0|            [#trump]|            [968674]|0.5288141|
| media-sourced po...|    0|            0|Twitter for Android|                 657|        0|            [#biden]|            [631862]|0.9560712|
| mientras hace ca...|    0|            0| Twitter for iPhone|                  14|        1|[#electionday, #d...|[631862, 7

In [17]:
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

# Download NLTK resources
nltk.download('omw-1.4')
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('stopwords')

# Initialize NLTK resources
stop_words = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()

#Define get functions
def getTokenizedText(text):
    return word_tokenize(text.lower())

def getFilteredTokens(tokens):
    filtered_tokens = [word for word in tokens if word.isalnum() and word not in stop_words]
    return filtered_tokens

def getLemmatizedTokens(filtered_tokens):
    lemmatized_tokens = [lemmatizer.lemmatize(word) for word in filtered_tokens]
    return lemmatized_tokens

#Define the UDF
@udf(returnType=ArrayType(StringType()))
def tokenize_tweet(text):
    # Tokenization
    tokens = getTokenizedText(text)

    # Remove stopwords and punctuation
    filtered_tokens = getFilteredTokens(tokens)

    # Lemmatization
    lemmatized_tokens = getLemmatizedTokens(filtered_tokens)
    return lemmatized_tokens

# Apply the UDF to the DataFrame
df = df.withColumn('tokens', tokenize_tweet(df['tweet']))

[nltk_data] Downloading package omw-1.4 to
[nltk_data]     C:\Users\nikhi\AppData\Roaming\nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\nikhi\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\nikhi\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\nikhi\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [18]:
import nltk
nltk.download('vader_lexicon')
from nltk.sentiment import SentimentIntensityAnalyzer
sia = SentimentIntensityAnalyzer()
sc.broadcast(sia)

def calculate_sentiment(text):
    #sia = SentimentIntensityAnalyzer()
    if isinstance(text, list):
        text = ' '.join(text)
    sentiment_scores = sia.polarity_scores(text)['compound']

    return sentiment_scores

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     C:\Users\nikhi\AppData\Roaming\nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [19]:
from pyspark.sql.types import *

# Define UDF to tokenize the tweet attribute
@udf(returnType=FloatType())
def polarize_tweet(tokens):
    sent=calculate_sentiment(tokens)
    return sent

# Add a new column 'Tokens' to the DataFrame 'df' by calling the 'tokenize_tweet' UDF on the 'tweet' column
df=df.withColumn('polarity', polarize_tweet(df['tokens']))

In [20]:
df.show(5)

+--------------------+-----+-------------+-------------------+--------------------+---------+--------------------+--------------------+---------+--------------------+--------+
|               tweet|likes|retweet_count|             source|user_followers_count|President|            hashtags|      hashtag_counts|   TF_IDF|              tokens|polarity|
+--------------------+-----+-------------+-------------------+--------------------+---------+--------------------+--------------------+---------+--------------------+--------+
| He’s nothing mor...|    0|            0|    Twitter Web App|                  71|        0|            [#trump]|            [968674]|0.5288141|[nothing, predato...|  0.0772|
| it shows that #t...|    1|            0|Twitter for Android|                  11|        0|            [#trump]|            [968674]|0.5288141|[show, trump, car...| -0.0772|
| media-sourced po...|    0|            0|Twitter for Android|                 657|        0|            [#biden]|      

In [21]:
from pyspark.sql.functions import when, col

df = df.withColumn('sent', when(col('polarity') > 0, 1)
                        .when(col('polarity') <= 0, -1)
                        .otherwise(0))

In [22]:
df.show(5)

+--------------------+-----+-------------+-------------------+--------------------+---------+--------------------+--------------------+---------+--------------------+--------+----+
|               tweet|likes|retweet_count|             source|user_followers_count|President|            hashtags|      hashtag_counts|   TF_IDF|              tokens|polarity|sent|
+--------------------+-----+-------------+-------------------+--------------------+---------+--------------------+--------------------+---------+--------------------+--------+----+
| He’s nothing mor...|    0|            0|    Twitter Web App|                  71|        0|            [#trump]|            [968674]|0.5288141|[nothing, predato...|  0.0772|   1|
| it shows that #t...|    1|            0|Twitter for Android|                  11|        0|            [#trump]|            [968674]|0.5288141|[show, trump, car...| -0.0772|  -1|
| media-sourced po...|    0|            0|Twitter for Android|                 657|        0|  

In [23]:
seed = 654321
train_df, test_df = df.randomSplit([0.7, 0.3], seed=seed)

In [24]:
train_df.cache()

DataFrame[tweet: string, likes: int, retweet_count: int, source: string, user_followers_count: int, President: int, hashtags: array<string>, hashtag_counts: array<bigint>, TF_IDF: float, tokens: array<string>, polarity: float, sent: int]

In [25]:
test_df.cache()

DataFrame[tweet: string, likes: int, retweet_count: int, source: string, user_followers_count: int, President: int, hashtags: array<string>, hashtag_counts: array<bigint>, TF_IDF: float, tokens: array<string>, polarity: float, sent: int]

In [26]:
from pyspark.ml.feature import VectorAssembler
feature_columns = ['likes', 'retweet_count', 'user_followers_count', 'sent','TF_IDF']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [27]:
#df_assembler = assembler.transform(train_df)
#df_assembler.show(5)

In [28]:
from pyspark.ml.classification import DecisionTreeClassifier

# Initialize the Decision Tree classifier
dt = DecisionTreeClassifier(featuresCol="features", labelCol="President",maxDepth=10)

In [30]:
from pyspark.ml import Pipeline

# Set up the pipeline
pipeline = Pipeline(stages=[assembler, dt])

In [31]:
# Train the model
model = pipeline.fit(train_df)

In [32]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions
predictions = model.transform(test_df)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="President", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")

Test Accuracy: 0.6370880930902014
