# Unveiling Twitter Sentiments: A Big Data Dive into Twitter Sentiments during the 2020 US Elections

This Notebook contains Code for the **Custom ML approach** for both DataSets ( Biden and Trump ).

<!--  -->

### Initiating Sprak Session

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Twitter_Sentiment_Analysis") \
    .appName("Group_Project") \
    .appName("custom_ML_solution") \
    .getOrCreate()

sc = spark.sparkContext

### Importing Libraries

In [0]:
%matplotlib inline 
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark.sql.functions as F
from pyspark.sql.functions import when, col
from pyspark.sql import Row
from pyspark.sql.functions import col, lit
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from wordcloud import wordcloud
import pandas as pd
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

# To ensure that each column's content is fully displayed without being truncated, you can set:
pd.set_option('display.max_colwidth', None)

### Setting up Spark DataFrame

In [0]:
#Importing the DF and casting column types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, DoubleType, FloatType

custom_schema = StructType([
    StructField("user_location", StringType(), True),
    StructField("continent", StringType(), True),
    StructField("is_english", BooleanType(), True),
    StructField("tweet_len", IntegerType(), True),
    StructField("word_count", IntegerType(), True),
    StructField("avg_word_len", DoubleType(), True),
    StructField("filtered_tweet", StringType(), True),
    StructField("subjectivity", FloatType(), True),
    StructField("polarity", FloatType(), True),
    StructField("sentiment", StringType(), True),
])

df_t = spark.read.csv("/FileStore/tables/hashtag_donaldtrump-ML.csv", schema=custom_schema)
df_t.persist()

DataFrame[user_location: string, continent: string, is_english: boolean, tweet_len: int, word_count: int, avg_word_len: double, filtered_tweet: string, subjectivity: float, polarity: float, sentiment: string]

<!--  -->

### Dropping Unnecessary Columns

In [0]:
#Dropping some unnecessary columns
columns_to_drop = ['user_location','continent','is_english']
df_t = df_t.drop(*columns_to_drop) 
df_t.unpersist()
df_t.cache()

DataFrame[tweet_len: int, word_count: int, avg_word_len: double, filtered_tweet: string, subjectivity: float, polarity: float, sentiment: string]

In [0]:
df_t.show(5)

+---------+----------+-----------------+--------------------+------------+--------+---------+
|tweet_len|word_count|     avg_word_len|      filtered_tweet|subjectivity|polarity|sentiment|
+---------+----------+-----------------+--------------------+------------+--------+---------+
|      270|        46|5.869565217391305|report administra...|       0.525|     0.3| positive|
|      242|        54|4.481481481481482|white house put t...|  0.44722223|  -0.225| negative|
|      125|        27| 4.62962962962963|curtis james jack...|         0.0|     0.0|  neutral|
|       32|         6|5.333333333333333|ticker covers lat...|         0.9|     0.5| positive|
|      157|        37|4.243243243243243|dj pot us pot gop...|         1.0|    -0.5| negative|
+---------+----------+-----------------+--------------------+------------+--------+---------+
only showing top 5 rows



### Checking Columns are Not NULL

In [0]:
# Ensuring filtered_tweet is not null
df_t = df_t.filter(df_t.filtered_tweet.isNotNull() & (df_t.filtered_tweet != ''))

<!--  -->

### Creating DataFrames based on the Sentiments

In [0]:
#creating two separate dfs for positive-negative and neutral

# Filtering positive and negative sentiments - this dataframe is for training and testing the model
positive_negative_df_t = df_t.filter((df_t["sentiment"] == "positive") | (df_t["sentiment"] == "negative"))

# Filtering neutral sentiment - this dataframe will later be introduced as unseen data.
neutral_df_t = df_t.filter(df_t["sentiment"] == "neutral")

<!--  -->

### Training and Testing the DataFrame

In [0]:
train_df_t, test_df_t = positive_negative_df_t.randomSplit([0.7, 0.3], seed=654321) 
print('training:',train_df_t.count())
print('testing:', test_df_t.count())

training: 359846
testing: 154404


### Data Distribution for Model Training. ( using RDD )

In [0]:
#we use rdds as they help us train separate models in separate partitions. We choose 3 partitions to train 3 models.
test_rdd_t = test_df_t.rdd.repartition(3).cache()
train_rdd_t = train_df_t.rdd.repartition(3).cache()

In [0]:
all_columns = train_df_t.columns # using this for our build function inspired from the lectures

### Building Logistic Regression Model using Novel Approach

In [0]:
# logistic regression model, which is a novel approach.
import pandas as pd
from sklearn.linear_model import LogisticRegression

def build_model(partition_iter):
    # 1. Convert the partition iterator to a pandas DataFrame
    partition_df = pd.DataFrame(partition_iter, columns=all_columns)
    print(partition_df.columns)
    # 2. Get the input features (X) and output label (y) columns
    X_train = partition_df.drop(columns=["filtered_tweet", "polarity", "sentiment"]) #remove string column, target and strong correlation column
    y_train = partition_df["sentiment"]
    
    # 3. Train the logistic regression model
    logistic = LogisticRegression()
    model = logistic.fit(X_train, y_train)
    
    # 4. Return the model for this partition
    return [model]

# Call build_model function with mapPartitions and collect the resulting models
models_t = train_rdd_t.mapPartitions(build_model).collect()

<!--  -->

### Broadcasting Predictions

Here, we are broadcasting the predictions from each model so all nodes have all models' prediction results. This provides fault tolerance. If one worker node fails, the others will have its results.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan
from pyspark.ml.classification import LogisticRegressionModel
from collections import Counter

broadcasted_models_t = sc.broadcast(models_t)
# predict function from lecture notes modified to use the broadcasted models
def predict_with_broadcast(instance):
    # retrieving broadcasted models
    local_models_t = broadcasted_models_t.value
    
    # predictions using each model
    predictions_t = [m.predict([instance[:-1]])[0] for m in local_models_t]
    
    return predictions_t

# function to apply predict_with_broadcast to each partition - predict function has been implemented from the lecture notes to do our predictions.

def predict_partition(iterator):
    # Apply predict_with_broadcast function to each instance in the partition
    return [predict_with_broadcast(row) for row in iterator]
  
test_df_t = test_df_t.drop("filtered_tweet", "polarity")
test_rdd_t = test_df_t.rdd.repartition(3).cache()

predictions_with_fault_tolerance_t = test_rdd_t.mapPartitions(predict_partition).collect()

### Aggregating Predictions

In [0]:
#this function has been modified. We use this to aggregate the predictions of the 3 models using a majority vote logic. Basically, for a certain row, if 2 models predicted "positive" and 1 model predicted "negative", we take the positive.

# Aggregating predictions using majority vote logic
def agg_predictions(predictions):
    # counting occurrences of each prediction label
    counts = Counter(predictions)
    
    # count of positive and negative predictions
    positive_count = counts.get("positive", 0)
    negative_count = counts.get("negative", 0)
    
    # aggregated prediction based on majority vote
    if positive_count > negative_count:
        return "positive"
    elif negative_count > positive_count:
        return "negative"
    else:
        # return "positive" as the default prediction
        return "positive"

# aggregating predictions from all partitions
aggregated_predictions_t = [agg_predictions(partition_predictions) for partition_predictions in predictions_with_fault_tolerance_t]
aggregated_predictions_t

['positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'negative',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'negative',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',

In [0]:
broadcasted_models_t # shows that we have the models broadcasted

<pyspark.broadcast.Broadcast at 0x7f21bc2e7f10>

In [0]:
models_t # shows that we have 3 models

[LogisticRegression(), LogisticRegression(), LogisticRegression()]

<!--  -->

### Evaluating The Performance 

Evaluating the performance of our model using Mean Absolute Error, as it is commonly used for regression models.

In [0]:
from pyspark.sql import Row
def transform(instance):
    # create a new Row from the instance Row and the aggregated prediction
    return Row(**instance.asDict(), raw_prediction=agg_predictions(predict_with_broadcast(instance)))

test_rdd_t.map(transform).take(3)

[Row(tweet_len=9, word_count=1, avg_word_len=9.0, subjectivity=0.5, sentiment='positive', raw_prediction='positive'),
 Row(tweet_len=9, word_count=2, avg_word_len=4.5, subjectivity=1.0, sentiment='negative', raw_prediction='positive'),
 Row(tweet_len=9, word_count=2, avg_word_len=4.5, subjectivity=0.699999988079071, sentiment='positive', raw_prediction='positive')]

In [0]:
pred_df_t = test_rdd_t.map(transform).toDF()
pred_df_t.show()

+---------+----------+------------+-------------------+---------+--------------+
|tweet_len|word_count|avg_word_len|       subjectivity|sentiment|raw_prediction|
+---------+----------+------------+-------------------+---------+--------------+
|        9|         1|         9.0|                0.5| positive|      positive|
|        9|         2|         4.5|                1.0| negative|      positive|
|        9|         2|         4.5|  0.699999988079071| positive|      positive|
|        9|         2|         4.5|                1.0| negative|      positive|
|        9|         2|         4.5| 0.4000000059604645| negative|      positive|
|        9|         3|         3.0|                0.5| positive|      positive|
|        9|         4|        2.25| 0.6000000238418579| negative|      positive|
|       10|         1|        10.0| 0.3499999940395355| positive|      positive|
|       10|         1|        10.0|               0.75| positive|      positive|
|       10|         2|      

In [0]:
# To calculate the mean absolute error for numerical predictions, and our predictions are of string type, we implement a custom function.

from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# user-defined function to calculate absolute error for sentiment labels
def calculate_absolute_error(true_sentiment, predicted_sentiment):
    if true_sentiment != predicted_sentiment:
        return 1.0  # return 1 for incorrect predictions
    else:
        return 0.0  # return 0 for correct predictions

calculate_absolute_error_udf = udf(calculate_absolute_error, FloatType())

# applying the UDF to create a new column "absolute_error"
pred_df_t = pred_df_t.withColumn("absolute_error", calculate_absolute_error_udf(pred_df_t["sentiment"], pred_df_t["raw_prediction"]))

# mean of the absolute error column
mean_absolute_error_t = pred_df_t.agg({"absolute_error": "mean"}).collect()[0][0]

print("Mean Absolute Error:", mean_absolute_error_t)

Mean Absolute Error: 0.3567265096759151


<!--  -->

### Training and Testing - Neutral Tweets

Until now we trained and tested on our positive-negative training set. Now we will classify the neutral tweets into positive and negative.

In [0]:
neutral_df_t = neutral_df_t.drop("filtered_tweet", "polarity")
neutral_rdd_t = neutral_df_t.rdd.repartition(3).cache()
neutral_predictions_with_fault_tolerance_t = neutral_rdd_t.mapPartitions(predict_partition).collect()
neutral_aggregated_predictions_t = [agg_predictions(partition_predictions) for partition_predictions in neutral_predictions_with_fault_tolerance_t]
neutral_aggregated_predictions_t

['positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'negative',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',

In [0]:
neutral_rdd_t.map(transform).take(3)

[Row(tweet_len=15, word_count=2, avg_word_len=7.5, subjectivity=0.0, sentiment='neutral', raw_prediction='positive'),
 Row(tweet_len=52, word_count=10, avg_word_len=5.2, subjectivity=0.0, sentiment='neutral', raw_prediction='positive'),
 Row(tweet_len=44, word_count=3, avg_word_len=14.666666666666666, subjectivity=0.10000000149011612, sentiment='neutral', raw_prediction='positive')]

In [0]:
neutral_pred_df_t = neutral_rdd_t.map(transform).toDF()
neutral_pred_df_t.show()

+---------+----------+------------------+-------------------+---------+--------------+
|tweet_len|word_count|      avg_word_len|       subjectivity|sentiment|raw_prediction|
+---------+----------+------------------+-------------------+---------+--------------+
|       15|         2|               7.5|                0.0|  neutral|      positive|
|       52|        10|               5.2|                0.0|  neutral|      positive|
|       44|         3|14.666666666666666|0.10000000149011612|  neutral|      positive|
|       17|         6|2.8333333333333335|                0.0|  neutral|      positive|
|       38|        11|3.4545454545454546|                0.0|  neutral|      positive|
|       75|        13| 5.769230769230769|                0.0|  neutral|      positive|
|       63|         7|               9.0|                0.0|  neutral|      positive|
|       58|        12| 4.833333333333333|                0.0|  neutral|      positive|
|       60|        13| 4.615384615384615|  

We get the count of positive, negative, positive-predicted-neutral and negative-predicted neutral tweets to see total positive tweets / all tweets for this candidate. We will then compare this ratio with the other candidate. Our assumption is that whoever has the higher ratio should win. We will see if the person with the higher ratio actually won to see if twitter sentiments actually reflect real election outcome.


In [0]:
# We get the count of positive, negative, positive-predicted-neutral and negative-predicted neutral tweets to see total positive tweets / all tweets for this candidate. We will then compare this ratio with the other candidate. Our assumption is that whoever has the higher ratio should win. We will see if the person with the higher ratio actually won to see if twitter sentiments actually reflect real election outcome.

from pyspark.sql.functions import col

positive_neutral_count = neutral_pred_df_t.filter(col("raw_prediction") == "positive").count()

negative_neutral_count = neutral_pred_df_t.filter(col("raw_prediction") == "negative").count()

positive_count = positive_negative_df_t.filter(col("sentiment") == "positive").count()

negative_count = positive_negative_df_t.filter(col("sentiment") == "negative").count()

sum = positive_neutral_count + negative_neutral_count + positive_count + negative_count
positives = positive_neutral_count + positive_count
ratio = positives/sum


# Display the counts
print("Positive Predictions Coming From Neutrals:", positive_neutral_count)
print("Negative Predictions Coming From Neutrals:", negative_neutral_count)
print("Positive Predictions:", positive_count)
print("Negative Predictions:", negative_count)
print("Positive/All Tweets Ratio For Trump:", ratio)

Positive Predictions Coming From Neutrals: 522607
Negative Predictions Coming From Neutrals: 2897
Positive Predictions: 327997
Negative Predictions: 186253
Positive/All Tweets Ratio For Trump: 0.8180819693889132


<!--  -->

### Local Approach for Trump Dataset

Now, we will see the scalability of our local approach by training and predicting 10% of the data, and then 20% of the data. If the training and predicting of 20% of the data takes twice the time, we conclude that the solution is scalable. 

The formula for size-up is: runtime to process n data / runtime to process all data.

In [0]:
# get the total count of rows in the main DataFrame
total_count_t = positive_negative_df_t.count()

# calculate the number of rows for the top 10%
ten_percent_t = int(total_count_t * 0.1)

# create a new DataFrame with the top 10% of rows
ten_percent_df_t = positive_negative_df_t.limit(ten_percent_t)

train_df_10_t, test_df_10_t = ten_percent_df_t.randomSplit([0.7, 0.3], seed=654321)

test_rdd_10_t = test_df_10_t.rdd.repartition(3).cache()
train_rdd_10_t = train_df_10_t.rdd.repartition(3).cache()
models10_t = train_rdd_10_t.mapPartitions(build_model).collect()

test_df_10_t = test_df_10_t.drop("filtered_tweet", "polarity")
test_rdd_10_t = test_df_10_t.rdd.repartition(3).cache()
predictions_with_fault_tolerance_10_t = test_rdd_10_t.mapPartitions(predict_partition).collect()
aggregated_predictions_10_t = [agg_predictions(partition_predictions) for partition_predictions in predictions_with_fault_tolerance_10_t]

Now, we run the same thing for 20% of the data.

In [0]:
# get the total count of rows in the main DataFrame
total_count_t = positive_negative_df_t.count()

# calculate the number of rows for the top 20%
twenty_percent_t = int(total_count_t * 0.2)

# create a new DataFrame with the top 20% of rows
twenty_percent_df_t = positive_negative_df_t.limit(twenty_percent_t)

train_df_20_t, test_df_20_t = twenty_percent_df_t.randomSplit([0.7, 0.3], seed=654321)

test_rdd_20_t = test_df_20_t.rdd.repartition(3).cache()
train_rdd_20_t = train_df_20_t.rdd.repartition(3).cache()
models20_t = train_rdd_20_t.mapPartitions(build_model).collect()

test_df_20_t = test_df_20_t.drop("filtered_tweet", "polarity")
test_rdd_20_t = test_df_20_t.rdd.repartition(3).cache()
predictions_with_fault_tolerance_20_t = test_rdd_20_t.mapPartitions(predict_partition).collect()
aggregated_predictions_20_t = [agg_predictions(partition_predictions) for partition_predictions in predictions_with_fault_tolerance_20_t]

Training and predicting 20% of the data took 41 seconds, while training and redicting 10% of the data took 22 seconds. This proves that our approach is indeed scalable. Training and predicting the entire dataset took 46 seconds.

size-up(10%) = 22/46 = 0.478

size-up(20%) = 41/46 = 0.891

It can be concluded that the runtime is almost linear and the approach is scalable.

<!--  -->

### Biden DataSet
Now, we do the same exact things for Biden:

In [0]:
df_b = spark.read.csv("/FileStore/tables/hashtag_joebiden-ML.csv", schema=custom_schema)
df_b.persist()
#Dropping some unneccessary columns
columns_to_drop = ['user_location','continent','is_english']
df_b = df_b.drop(*columns_to_drop) 
df_b.unpersist()
df_b.cache()
# Ensuring filtered_tweet is not null
df_b = df_b.filter(df_b.filtered_tweet.isNotNull() & (df_b.filtered_tweet != ''))

#creating two separate dfs for positive-negative and neutral

# Filtering positive and negative sentiments - this dataframe is for training and testing the model
positive_negative_df_b = df_b.filter((df_b["sentiment"] == "positive") | (df_b["sentiment"] == "negative"))

# Filtering neutral sentiment - this dataframe will later be introduced as unseen data.
neutral_df_b = df_b.filter(df_b["sentiment"] == "neutral")

train_df_b, test_df_b = positive_negative_df_b.randomSplit([0.7, 0.3], seed=654321) # taken from lecture notes
print('training:',train_df_b.count())
print('testing:', test_df_b.count())

training: 263488
testing: 112926


In [0]:
#we use rdds as they help us train separate models in separate partitions. We choose 3 partitions to train 3 models.
test_rdd_b = test_df_b.rdd.repartition(3).cache()
train_rdd_b = train_df_b.rdd.repartition(3).cache()

all_columns = train_df_b.columns # using this for our build function inspired from the lectures

def build_model(partition_iter):
    # 1. Convert the partition iterator to a pandas DataFrame
    partition_df = pd.DataFrame(partition_iter, columns=all_columns)
    print(partition_df.columns)
    # 2. Get the input features (X) and output label (y) columns
    X_train = partition_df.drop(columns=["filtered_tweet", "polarity", "sentiment"]) #remove string column, target and strong correlation column
    y_train = partition_df["sentiment"]
    
    # 3. Train the logistic regression model
    logistic = LogisticRegression()
    model = logistic.fit(X_train, y_train)
    
    # 4. Return the model for this partition
    return [model]

# Call build_model function with mapPartitions and collect the resulting models
models_b = train_rdd_b.mapPartitions(build_model).collect()

broadcasted_models_b = sc.broadcast(models_b)
# predict function from lecture notes modified to use the broadcasted models
def predict_with_broadcast(instance):
    # retrieving broadcasted models
    local_models_b = broadcasted_models_b.value
    
    # predictions using each model
    predictions_b = [m.predict([instance[:-1]])[0] for m in local_models_b]
    
    return predictions_b

# function to apply predict_with_broadcast to each partition - predict function has been implemented from the lecture notes to do our predictions.

def predict_partition(iterator):
    # Apply predict_with_broadcast function to each instance in the partition
    return [predict_with_broadcast(row) for row in iterator]
  
test_df_b = test_df_b.drop("filtered_tweet", "polarity")
test_rdd_b = test_df_b.rdd.repartition(3).cache()

predictions_with_fault_tolerance_b = test_rdd_b.mapPartitions(predict_partition).collect()

# aggregating predictions from all partitions
aggregated_predictions_b = [agg_predictions(partition_predictions) for partition_predictions in predictions_with_fault_tolerance_b]
aggregated_predictions_b

['positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',
 'positive',

In [0]:
test_rdd_b.map(transform).take(3)
pred_df_b = test_rdd_b.map(transform).toDF()

### Model Performance using Absolute Error

In [0]:
# applying the UDF to create a new column "absolute_error"
pred_df_b = pred_df_b.withColumn("absolute_error", calculate_absolute_error_udf(pred_df_b["sentiment"], pred_df_b["raw_prediction"]))

# mean of the absolute error column
mean_absolute_error_b = pred_df_b.agg({"absolute_error": "mean"}).collect()[0][0]

print("Mean Absolute Error:", mean_absolute_error_b)

Mean Absolute Error: 0.2945380160459062


<!--  -->

### Calssifing using Neutral Tweets

Until now we trained and tested on our positive-negative training set. Now we will classify the neutral tweets into positive and negative.

In [0]:
neutral_df_b = neutral_df_b.drop("filtered_tweet", "polarity")
neutral_rdd_b = neutral_df_b.rdd.repartition(3).cache()
neutral_predictions_with_fault_tolerance_b = neutral_rdd_b.mapPartitions(predict_partition).collect()
neutral_aggregated_predictions_b = [agg_predictions(partition_predictions) for partition_predictions in neutral_predictions_with_fault_tolerance_b]
neutral_rdd_b.map(transform).take(3)
neutral_pred_df_b = neutral_rdd_b.map(transform).toDF()


We get the count of positive, negative, positive-predicted-neutral and negative-predicted neutral tweets to see total positive tweets / all tweets for this candidate. We will then compare this ratio with the other candidate. Our assumption is that whoever has the higher ratio should win. We will see if the person with the higher ratio actually won to see if twitter sentiments actually reflect real election outcome.


In [0]:
# We get the count of positive, negative, positive-predicted-neutral and negative-predicted neutral tweets to see total positive tweets / all tweets for this candidate. We will then compare this ratio with the other candidate. Our assumption is that whoever has the higher ratio should win. We will see if the person with the higher ratio actually won to see if twitter sentiments actually reflect real election outcome.

from pyspark.sql.functions import col

positive_neutral_count = neutral_pred_df_b.filter(col("raw_prediction") == "positive").count()

negative_neutral_count = neutral_pred_df_b.filter(col("raw_prediction") == "negative").count()

positive_count = positive_negative_df_b.filter(col("sentiment") == "positive").count()

negative_count = positive_negative_df_b.filter(col("sentiment") == "negative").count()

sum = positive_neutral_count + negative_neutral_count + positive_count + negative_count
positives = positive_neutral_count + positive_count
ratio = positives/sum


# Display the counts
print("Positive Predictions Coming From Neutrals:", positive_neutral_count)
print("Negative Predictions Coming From Neutrals:", negative_neutral_count)
print("Positive Predictions:", positive_count)
print("Negative Predictions:", negative_count)
print("Positive/All Tweets Ratio For Biden:", ratio)

Positive Predictions Coming From Neutrals: 382406
Negative Predictions Coming From Neutrals: 12
Positive Predictions: 265638
Negative Predictions: 110776
Positive/All Tweets Ratio For Biden: 0.8540019398233074


Now, we will see the scalability of our local approach by training and predicting 10% of the data, and then 20% of the data. If the training and predicting of 20% of the data takes twice the time, we conclude that the solution is scalable. 

The formula for size-up is: runtime to process n data / runtime to process all data.

In [0]:
# get the total count of rows in the main DataFrame
total_count_b = positive_negative_df_b.count()

# calculate the number of rows for the top 10%
ten_percent_b = int(total_count_b * 0.1)

# create a new DataFrame with the top 10% of rows
ten_percent_df_b = positive_negative_df_b.limit(ten_percent_b)

train_df_10_b, test_df_10_b = ten_percent_df_b.randomSplit([0.7, 0.3], seed=654321)

test_rdd_10_b = test_df_10_b.rdd.repartition(3).cache()
train_rdd_10_b = train_df_10_b.rdd.repartition(3).cache()
models10_b = train_rdd_10_b.mapPartitions(build_model).collect()

test_df_10_b = test_df_10_b.drop("filtered_tweet", "polarity")
test_rdd_10_b = test_df_10_b.rdd.repartition(3).cache()
predictions_with_fault_tolerance_10_b = test_rdd_10_b.mapPartitions(predict_partition).collect()
aggregated_predictions_10_b = [agg_predictions(partition_predictions) for partition_predictions in predictions_with_fault_tolerance_10_b]

Now, we run the same thing for 20% of the data.

In [0]:
# get the total count of rows in the main DataFrame
total_count_b = positive_negative_df_b.count()

# calculate the number of rows for the top 20%
twenty_percent_b = int(total_count_b * 0.2)

# create a new DataFrame with the top 20% of rows
twenty_percent_df_b = positive_negative_df_b.limit(twenty_percent_b)

train_df_20_b, test_df_20_b = twenty_percent_df_b.randomSplit([0.7, 0.3], seed=654321)

test_rdd_20_b = test_df_20_b.rdd.repartition(3).cache()
train_rdd_20_b = train_df_20_b.rdd.repartition(3).cache()
models20_b = train_rdd_20_b.mapPartitions(build_model).collect()

test_df_20_b = test_df_20_b.drop("filtered_tweet", "polarity")
test_rdd_20_b = test_df_20_b.rdd.repartition(3).cache()
predictions_with_fault_tolerance_20_b = test_rdd_20_b.mapPartitions(predict_partition).collect()
aggregated_predictions_20_b = [agg_predictions(partition_predictions) for partition_predictions in predictions_with_fault_tolerance_20_b]

Training and predicting 20% of the data took 11 seconds, while training and redicting 10% of the data took 35 seconds. This proves that our approach is indeed scalable. Training and predicting the entire dataset took 39 seconds.

size-up(10%) = 35/39 = 0.897

size-up(20%) = 11/39 = 0.282

It can be concluded that the runtime is almost constant and the approach is scalable.

Our custom local approach also says Biden should win due to his ratio being bigger, which he did.