# 4. CoVax Tweets with VADER

- Arshiya Ansari - aa9yk
- Congxin (David) Xu - cx2rx
- Pengwei (Tiger) Hu - ph3bz
- Kip McCharen - cam7cu

## Description

This notebook is written to combine multiple scrapes of tweets from the Twitter API and analyze them using Valence Aware Dictionary and sEntiment Reasoner (VADER). 

## Step 1: Import Modules, Start Spark

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
import re
import os
import urllib 
import nltk

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *

%matplotlib inline

In [2]:
# Getting the current directory of interest
thisdir = '/project/ds5559/twitter_sentiment_analysis_group/'

spark = SparkSession.builder \
        .master("local[*]") \
        .appName("Python Spark SQL basic example") \
        .config("spark.executor.memory", '20g') \
        .config('spark.executor.cores', '8') \
        .config('spark.executor.instances', '2') \
        .config("spark.driver.memory",'1g') \
        .appName("twitter_project") \
        .getOrCreate()

sc = spark.sparkContext

sqlContext = SQLContext(sc)

In [3]:
def pd_df_from_csvs_in_directory(directory):
    """
    Accept directory, convert each CSV to pandas dataframe
    then join all dataframes to single dataframe output.
    """
    df = pd.DataFrame()
    # r=root, d=directories, f = files
    for r, d, f in os.walk(directory):
        for file in f:
            if file.endswith(".csv"):
                filedir = os.path.join(r, file)
                print(filedir)
                try:
                    df = pd.concat([df, pd.read_csv(filedir, index_col = 0)])
                except:
                    pass
                
    return df

dfALL = pd_df_from_csvs_in_directory(thisdir)

/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210318_0309.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210322_0254.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210320_1148.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210321_1119.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210401_2123.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210404_0240.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210405_0241.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210318_1359.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210409_2329.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210322_1518.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_early_attempt.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210319_1252.csv
/project/ds5559/twitter_sentiment_anal

  if (await self.run_code(code, result,  async_=asy)):


/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210323_0513.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210317_1203.csv
/project/ds5559/twitter_sentiment_analysis_group/tmp_pandas_df.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210329_0858.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210321_0531.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210321_0055.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210321_1044.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210327_0836.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210318_1401.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210320_4511.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210322_2104.csv
/project/ds5559/twitter_sentiment_analysis_group/hashtag_output_210407_0146.csv
/project/ds5559/twitter_sentiment_analysis_group/hash

In [4]:
#How many records did we capture?
len(dfALL.index)

3007437

## Part 2: DF to Tab-Delimited to Spark Dataframe

In [5]:
%time
##https://stackoverflow.com/questions/37513355/converting-pandas-dataframe-into-spark-dataframe-error
dfALL = dfALL.replace(r"[\n|\t|\r]", " ", regex=True)
dfALL.to_csv(thisdir + "tmp_pandas_df.csv", sep="\t")

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 6.44 µs


In [6]:
df = spark.read.option("delimiter", "\t").option("header", "true").csv(thisdir + "tmp_pandas_df.csv")
df.count() #count records in Spark DF

3007437

In [7]:
type(df)

pyspark.sql.dataframe.DataFrame

In [8]:
df.show(4)

+---+---------------+-------------+--------------------+--------------------+--------------------+---------+-----------------------+-------------+--------------+----+--------------+--------------------+--------------------+-------------+--------------------+--------------------+-------+--------------------+------------------+
|_c0|scraped_hashtag|scraped_order|          created_at|              id_str|                text|truncated|in_reply_to_screen_name|retweet_count|favorite_count|lang|   screen_name|           user_name|    user_description|user_verified|user_followers_count|            hashtags|symbols|         og_tweet_by|og_tweet_truncated|
+---+---------------+-------------+--------------------+--------------------+--------------------+---------+-----------------------+-------------+--------------+----+--------------+--------------------+--------------------+-------------+--------------------+--------------------+-------+--------------------+------------------+
|  0|       #vac

This is a large dataset with many columns, we will have to narrow down the data to use it effectively. 

## Part 3: Apply VADER Valence Analyzer

In [9]:
import nltk
nltk.download('vader_lexicon')
from nltk.sentiment.vader import SentimentIntensityAnalyzer

sid = SentimentIntensityAnalyzer()

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/cam7cu/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


Drop NA rows in the text column, VADER returns errors if it encounters empty strings.

In [10]:
dfdata = df.na.drop(subset=["text"]).drop()

In [11]:
# Calculate polarity scores from VADER, export into its own dataframe
df2 = dfdata.rdd\
    .map(lambda x: list(sid.polarity_scores(x['text']).values()))\
    .toDF(["neg", "neu", "pos", "compound"])
df2.show(5)

+-----+-----+-----+--------+
|  neg|  neu|  pos|compound|
+-----+-----+-----+--------+
|  0.0| 0.92| 0.08|  0.1027|
|0.141|0.859|  0.0| -0.4215|
|  0.0|0.851|0.149|  0.5411|
|  0.0|0.821|0.179|  0.5859|
|  0.0|0.816|0.184|  0.6467|
+-----+-----+-----+--------+
only showing top 5 rows



There's probably a much smarter way to do this, but I am adding a sequential ID and joining on that. 

In [12]:
from pyspark.sql.functions import monotonically_increasing_id

ddf1 = dfdata.withColumn("row_id", monotonically_increasing_id())
ddf2 = df2.withColumn("row_id2", monotonically_increasing_id())
result = ddf1.join(ddf2, ddf1.row_id == ddf2.row_id2) #.drop("row_id")

In [13]:
result.show(1)

+---+---------------+-------------+--------------------+--------------------+--------------------+---------+-----------------------+-------------+--------------+----+---------------+-------------+--------------------+-------------+--------------------+--------------------+-------+-----------+------------------+------+---+---+---+--------+-------+
|_c0|scraped_hashtag|scraped_order|          created_at|              id_str|                text|truncated|in_reply_to_screen_name|retweet_count|favorite_count|lang|    screen_name|    user_name|    user_description|user_verified|user_followers_count|            hashtags|symbols|og_tweet_by|og_tweet_truncated|row_id|neg|neu|pos|compound|row_id2|
+---+---------------+-------------+--------------------+--------------------+--------------------+---------+-----------------------+-------------+--------------+----+---------------+-------------+--------------------+-------------+--------------------+--------------------+-------+-----------+---------

## Part 4: Export to Directory of CSVs

In [14]:
result.write.csv('twitter_corpus_VADER')

AnalysisException: path file:/sfs/qumulo/qhome/cam7cu/twitter_corpus_VADER already exists.;

In [None]:
!tar -czf twitter_corpus_VADER.tar.gz twitter_corpus_VADER

## Part 5: Analyze Manual Labels vs VADER

In [15]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("Python Spark SQL basic example") \
        .config("spark.executor.memory", '20g') \
        .config('spark.executor.cores', '8') \
        .config('spark.executor.instances', '2') \
        .config("spark.driver.memory",'1g') \
        .appName("twitter_project") \
        .getOrCreate()

sc = spark.sparkContext

sqlContext = SQLContext(sc)

In [16]:
df = spark.read.option("delimiter", "\t").option("header", "true").csv("Manual_and_Preds.txt")
df.count() #count records in Spark DF

400

In [17]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as f

#using max of multiple columns, add single predicted valence column
dfval = df.withColumn('prediction', f.col('PRED').cast('double'))
                      
#using max of multiple columns, add single manual valence column
dfval = dfval.withColumn('label', f.col('TRUE').cast('double'))

dfval = dfval.select("prediction", "label")

dfval.show(1)

+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|  1.0|
+----------+-----+
only showing top 1 row



In [18]:
# compute accuracy
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("VADER valence prediction accuracy = " + str(evaluator.evaluate(dfval)))

VADER valence prediction accuracy = 0.4025


In [19]:
# compute precision
evaluator = MulticlassClassificationEvaluator(metricName="precisionByLabel")
print("VADER valence prediction precision = " + str(evaluator.evaluate(dfval)))

VADER valence prediction precision = 0.39185750636132316


In [20]:
# compute recall
evaluator = MulticlassClassificationEvaluator(metricName="recallByLabel")
print("VADER valence prediction recall = " + str(evaluator.evaluate(dfval)))

VADER valence prediction recall = 1.0


In [21]:
# compute F1 score
evaluator = MulticlassClassificationEvaluator(metricName="fMeasureByLabel")
print("VADER valence prediction F1 score = " + str(evaluator.evaluate(dfval)))

VADER valence prediction F1 score = 0.5630712979890311


In [22]:
# compute confusion matrix
print("VADER valence prediction Confusion Matrix")
dfval.groupBy("prediction").pivot("label").count().collect()

VADER valence prediction Confusion Matrix


[Row(prediction=0.0, -1.0=158, 0.0=154, 1.0=81),
 Row(prediction=-1.0, -1.0=2, 0.0=None, 1.0=None),
 Row(prediction=1.0, -1.0=None, 0.0=None, 1.0=5)]