<h1 style="text-align: center;"><span style="color: #000000;"> CIS 731 Final Project: Sentiment Analysis Using Pyspark on Fan's Tweets Following the NFL Draft
</span></h1>
<h3 style="text-align: center;"><span style="color: #000000;"> Created by: Thomas Mallinson
</span></h3>

![combo.png](attachment:combo.png)

# 1. Introduction

### 1.1. Procedure

This project will descriptive analytics and content analytics on keywords pertaining to Tweets regarding the 2020 NFL Draft. For the first time ever, the draft was held virtually due to the COVID-19 Pandemic. I want to see how fans reacted to the new structure. I will scrape tweets using Tweepy and collect them using the cloud server PythonAnywhere. Tweet data will be analyzed by applying descriptive, content, and network analytics techniques using Python and PySpark. I will use a Naive Bayes classification algorithm to classify the text and sentiment analysis to record the feeling towards the results of the draft.

### 1.2. Procedure Definitions

**Cloud Server/Cloud Computing:** A cloud server is a logical server that is built, hosted and delivered through a cloud computing platform over the Internet. Cloud servers possess and exhibit similar capabilities and functionality to a typical server but are accessed remotely from a cloud service provider. PythonAnywhere is an online integrated development environment and web hosting service based on the Python programming language.

**PySpark:** Apache Spark is an analytics engine and parallel computation framework with Scala, Python, Java, and R interfaces. Spark can load data directly from disk, memory and other data storage technologies such as Amazon S3, Hadoop Distributed File System (HDFS), Cassandra and others. PySpark is collaboration of Apache Spark and Python, a general-purpose, high-level programming language. 

**Tweepy:** Tweepy is a Python package that provides a convenient way to use the Twitter API. The Twitter API gives developers access to most of Twitter's functionality, such as reading and writing information related to tweets, users, and trends.

### 1.3. Sentiment Analysis Pipeline

![sentiment_analysis_pipeline.png](attachment:sentiment_analysis_pipeline.png)

# 2. Twitter Data Collection

### 2.1. Streaming Twitter Data and Saving in .csv Files

The following code was ran in a PythonAnywhere Cloud Server for hashtags, or keywords, associated with the NFL Draft. In order to access Twitter's API, I created an account on Algorithmia.com and then a Twitter developer account and went through the necessary steps to obtain access. I used multiple API calls in order to collect 50,000 total tweets for a span of five days, ranging from the first round of the draft (Day One) to two days following the third and final draft day. Due to the quantity of tweets utilizing the particular hashtags I used as keys, I had to scrape in separate .py files and then combine them before preprocessing here.

# Call the function scrapetweets
scrapetweets(search_words, date_since, date_until, numTweets, numRuns) # Twitter credentials
consumer_key = 'ZySgTb5HKxTZkqKzurkVW5ONG'
consumer_secret = 'eMVDQwD25CM9EkE4ly9MkLjAvxUvKtk3zku4hZvuuYjp26XNwo'
access_key = '1176869078738378753-VD2OWybAZHmXt0btIPHMn9ClB25ObD'
access_secret = '1WxN4OoRvVloYnGGhZVpIzGvw9StYydLeBfliX6L9IXML'

# Pass your twitter credentials to tweepy via its OAuthHandler
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)

# Calls API every 15 minutes to prevent overcalling

def scrapetweets(search_words, date_since, date_until, numTweets, numRuns):

    ## Arguments:
    # search_words -> define a string of keywords for this function to extract
    # date_since -> define a date from which to start extracting the tweets 
    # date_until -> define a date from which to end extracting the tweets 
    # numTweets -> number of tweets to extract per run
    # numRun -> number of runs to perform in this program - API calls are limited to once every 15 mins, so each run will be 15 mins apart.
    
    # Define a pandas dataframe to store the date:
    db_tweets = pd.DataFrame(columns = ['username', 'acctdesc', 'location', 'following',
                                        'followers', 'totaltweets', 'usercreatedts', 'tweetcreatedts',
                                        'retweetcount', 'text', 'hashtags']
                                )
    # Define a for-loop to generate tweets at regular intervals
    for i in range(0, numRuns):
        # We will time how long it takes to scrape tweets for each run:
        start_run = time.time()
        
        # Collect tweets using the Cursor object
        # .Cursor() returns an object that you can iterate or loop over to access the data collected.
        # Each item in the iterator has various attributes that you can access to get information about each tweet
        tweets = tweepy.Cursor(api.search, q=search_words, lang="en", since=date_since, until=date_until, tweet_mode='extended').items(numTweets)

        # Store these tweets into a python list
        tweet_list = [tweet for tweet in tweets]

        # Begin scraping the tweets individually:
        noTweets = 0

        for tweet in tweet_list:

            # Pull the values
            username = tweet.user.screen_name
            acctdesc = tweet.user.description
            location = tweet.user.location
            following = tweet.user.friends_count
            followers = tweet.user.followers_count
            totaltweets = tweet.user.statuses_count
            usercreatedts = tweet.user.created_at
            tweetcreatedts = tweet.created_at
            retweetcount = tweet.retweet_count
            hashtags = tweet.entities['hashtags']

            try:
                text = tweet.retweeted_status.full_text
            except AttributeError:  # Not a Retweet
                text = tweet.full_text

            # Add the 11 variables to the empty list - ith_tweet:
            ith_tweet = [username, acctdesc, location, following, followers, totaltweets,
                         usercreatedts, tweetcreatedts, retweetcount, text, hashtags]

            # Append to dataframe - db_tweets
            db_tweets.loc[len(db_tweets)] = ith_tweet

            # increase counter - noTweets  
            noTweets += 1
        
        # Run ended:
        end_run = time.time()
        duration_run = round(end_run-start_run, 2)
        
        print('no. of tweets scraped for run {} is {}'.format(i, noTweets))
        print('time take for {} run to complete is {}'.format(i, duration_run))
        
        time.sleep(900) #15 minute sleep time

        
    # Once all runs have completed, save them to a single csv file:    
    # Obtain timestamp in a readable format:
    from datetime import datetime
    to_csv_timestamp = datetime.today().strftime('%Y%m%d_%H%M%S')

    # Define working path and filename
    path = os.getcwd()
    filename = path + '/data/' + to_csv_timestamp + 'nfldraftsample.csv'

    # Store dataframe in csv with creation date timestamp
    db_tweets.to_csv(filename, index = False)
    
    print('Scraping has completed!')

# Initialise these variables:
search_words = "#nfldraft OR #nfldraft2020 OR #nfldraftday OR #draft OR #draft2020"
date_since = "2020-4-23"
date_until = "2020-4-27"
numTweets = 2500
numRuns = 20
# Call the function scrapetweets
scrapetweets(search_words, date_since, date_until, numTweets, numRuns)

### 2.2. Key Attributes in Tweet .csv

**The following information was collected:**
- user.screen_name - twitter handle
- user.description - description of account
- user.location - where is he tweeting from
- user.friends_count - no. of other users that user is following (following)
- user.followers_count - no. of other users who are following this user (followers)
- user.statuses_count - total tweets by user
- user.created_at - when the user account was created
- created_at - when the tweet was created
- retweet_count - no. of retweets
- (deprecated) user.favourites_count - probably total no. of tweets that is favourited by user
- retweeted_status.full_text - full text of the tweet
- tweet.entities['hashtags'] - hashtags in the tweet

# 3. Loading Tweet Data Into PySpark DataFrame

I have loaded the necessary Spark files into my computer and set paths in order to be able to spin up a Spark cluster on my local machine.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

spark-2.4.5-bin-hadoop2.7/
spark-2.4.5-bin-hadoop2.7/licenses/
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-jtransforms.html
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-zstd.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-zstd-jni.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-xmlenc.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-vis.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-spire.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-sorttable.js.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-slf4j.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-scopt.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-scala.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-sbt-launch-lib.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-respond.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-reflectasm.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-pyrolite.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-py4j.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-protobuf.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-pmml-model

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

import findspark
findspark.init()

from pyspark import SparkContext

def create_spark_context():
  return SparkContext.getOrCreate()

In [0]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession

# Setting spark.driver.memory due to issues faced with pyspark.ml processing
SparkContext.setSystemProperty('spark.driver.memory', '16g')

# Create Spark Contexts
sc = pyspark.SparkContext()
sc.getConf()
conf = pyspark.SparkConf().setAll([('spark.executor.memory', '16g'), ('spark.executor.cores', '3'), ('spark.cores.max', '3'), ('spark.driver.memory','8g'), ("spark.memory.offHeap.size","16g"),('spark.memory.offHeap.enabled','true')])
sc.stop()
sc = pyspark.SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [4]:
sc._conf.getAll()

[('spark.app.id', 'local-1589576794460'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.port', '44029'),
 ('spark.memory.offHeap.enabled', 'true'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.cores.max', '3'),
 ('spark.executor.memory', '16g'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.memory', '8g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.cores', '3'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.memory.offHeap.size', '16g'),
 ('spark.driver.host', 'b16b9fa719b0')]

In [7]:
# Import NFL Draft Tweets data
from google.colab import files
uploaded = files.upload()

Saving NFLDraftTweets.csv to NFLDraftTweets.csv


In [0]:
tweetdf = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('NFLDraftTweets.csv')

In [0]:
!head -1000 NFLDraftTweets.csv >> NFLDraftTweets_small.csv

In [11]:
type(tweetdf)

pyspark.sql.dataframe.DataFrame

Above, we can see that this is in a PySpark DataFrame data structure. DataFrames in Spark are immutable, distributed, and designed to process structured data. To help Apache Spark understand the schema, the data is organized under named columns. This helps Spark optimize execution plans and handle pedabytes of data.

In [12]:
#Display first five rows of data
tweetdf.show(5)

+--------------------+--------------------+---------------+---------+---------+---------------+----------------+---------------+--------------------+--------------------+--------------------+
|            username|            acctdesc|       location|following|followers|    totaltweets|   usercreatedts| tweetcreatedts|        retweetcount|                text|            hashtags|
+--------------------+--------------------+---------------+---------+---------+---------------+----------------+---------------+--------------------+--------------------+--------------------+
|     Spectpooheagles|Optimuspooh, Mixe...|Philadelphia,pa|     1948|     1337|         206944|  4/26/2011 2:05|4/27/2020 23:59|                  52|#NFL #NFLDraft Te...|[{'text': 'NFL', ...|
|         sammydabber|st rose class of ...|           null|     null|     null|           null|            null|           null|                null|                null|                null|
|I like carmella m...|       United Stat

In [13]:
tweetdf.printSchema()

root
 |-- username: string (nullable = true)
 |-- acctdesc: string (nullable = true)
 |-- location: string (nullable = true)
 |-- following: string (nullable = true)
 |-- followers: string (nullable = true)
 |-- totaltweets: string (nullable = true)
 |-- usercreatedts: string (nullable = true)
 |-- tweetcreatedts: string (nullable = true)
 |-- retweetcount: string (nullable = true)
 |-- text: string (nullable = true)
 |-- hashtags: string (nullable = true)



# 4. Text Preprocessing

### 4.1. Creating New DataFrame

Before applying performing sentiment analysis or applying a classification algorithm, we need to preprocess the data. First, I will create a new dataframe containing only the important columns we will be looking at in this project (text and hashtags). Then, I will perform custom transformations by creating Spark's User-Defined Functions (UDFs) for different common preprocessing techniques. I will be using the Natural Language Toolkit (NLTK) Python Package to do some of the preprocessing.

In [14]:
#Create newtweetdf with text and hashtag columns
newtweetdf = tweetdf.drop('username', 'acctdesc', 'location', 'following', 'followers', 'totaltweets', 'usercreatedts', 'tweetcreatedts', 'retweetcount')

#Drop duplicate rows
newtweetdf = newtweetdf.dropDuplicates()

#Drop rows with only null values
newtweetdf = newtweetdf.dropna(how='all')
newtweetdf.show(5)

+--------------------+--------+
|                text|hashtags|
+--------------------+--------+
|How did you @Bill...|      []|
|RD 2 | Pick 41 - ...|    null|
|Nice local haul i...|    null|
|      Dear @Raiders,|    null|
|Garrett Taylor (@...|    null|
+--------------------+--------+
only showing top 5 rows



### 4.2. Remove Non-ASCII Characters

In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

import nltk
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import stopwords
nltk.download('stopwords')
set(stopwords.words("english"))
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
from nltk import pos_tag
import string
import re

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.


In [0]:
def strip_non_ascii(data_str):
    ''' Returns the string without non ASCII characters'''
    stripped = (c for c in data_str if 0 < ord(c) < 127)
    return ''.join(stripped)

# Setup PySpark UDF Function
strip_non_ascii_udf = udf(strip_non_ascii, StringType())

In [17]:
nonascii_df = newtweetdf.withColumn('text_non_asci',strip_non_ascii_udf(newtweetdf['text']))
nonascii_df.show(5)

+--------------------+--------+--------------------+
|                text|hashtags|       text_non_asci|
+--------------------+--------+--------------------+
|How did you @Bill...|      []|How did you @Bill...|
|RD 2 | Pick 41 - ...|    null|RD 2 | Pick 41 - ...|
|Nice local haul i...|    null|Nice local haul i...|
|      Dear @Raiders,|    null|      Dear @Raiders,|
|Garrett Taylor (@...|    null|Garrett Taylor (@...|
+--------------------+--------+--------------------+
only showing top 5 rows



### 4.3. Fix Abbreviations

In [0]:
def fix_abbreviation(data_str):
    data_str = data_str.lower()
    data_str = re.sub(r'\bthats\b', 'that is', data_str)
    data_str = re.sub(r'\bive\b', 'i have', data_str)
    data_str = re.sub(r'\bim\b', 'i am', data_str)
    data_str = re.sub(r'\bya\b', 'yeah', data_str)
    data_str = re.sub(r'\bcant\b', 'can not', data_str)
    data_str = re.sub(r'\bdont\b', 'do not', data_str)
    data_str = re.sub(r'\bwont\b', 'will not', data_str)
    data_str = re.sub(r'\bid\b', 'i would', data_str)
    data_str = re.sub(r'\bwth\b', 'what the hell', data_str)
    data_str = re.sub(r'\br\b', 'are', data_str)
    data_str = re.sub(r'\bu\b', 'you', data_str)
    data_str = re.sub(r'\bk\b', 'OK', data_str)
    data_str = re.sub(r'\bsux\b', 'sucks', data_str)
    data_str = re.sub(r'\bno+\b', 'no', data_str)
    data_str = re.sub(r'\bcoo+\b', 'cool', data_str)
    data_str = re.sub(r'rt\b', '', data_str)
    data_str = data_str.strip()
    return data_str

# Setup PySpark UDF Function
fix_abbreviation_udf = udf(fix_abbreviation, StringType())

In [19]:
fixabb_df = nonascii_df.withColumn('fixed_abbrev',fix_abbreviation_udf(nonascii_df['text_non_asci']))
fixabb_df.show(5)

+--------------------+--------+--------------------+--------------------+
|                text|hashtags|       text_non_asci|        fixed_abbrev|
+--------------------+--------+--------------------+--------------------+
|How did you @Bill...|      []|How did you @Bill...|how did you @bill...|
|RD 2 | Pick 41 - ...|    null|RD 2 | Pick 41 - ...|rd 2 | pick 41 - ...|
|Nice local haul i...|    null|Nice local haul i...|nice local haul i...|
|      Dear @Raiders,|    null|      Dear @Raiders,|      dear @raiders,|
|Garrett Taylor (@...|    null|Garrett Taylor (@...|garrett taylor (@...|
+--------------------+--------+--------------------+--------------------+
only showing top 5 rows



### 4.4. Remove Stop Words

Stop words are some of the most common words in English like "a", "the", "is", etc. They are generally removed from text because they do not carry any sentiment. 

In [0]:
def remove_stops(data_str):
    # expects a string
    stops = stopwords.words()
    list_pos = 0
    cleaned_str = ''
    text = data_str.split()
    for word in text:
        if word not in stops:
            # rebuild cleaned_str
            if list_pos == 0:
                cleaned_str = word
            else:
                cleaned_str = cleaned_str + ' ' + word
            list_pos += 1
    return cleaned_str

# Setup PySpark UDF function
remove_stops_udf = udf(remove_stops, StringType())

#Get raw columns
raw_cols = fixabb_df.columns

In [21]:
stopwordsdf = fixabb_df.select(raw_cols).withColumn("stopword_text", remove_stops_udf(fixabb_df["fixed_abbrev"]))
stopwordsdf.show(5)

+--------------------+--------+--------------------+--------------------+--------------------+
|                text|hashtags|       text_non_asci|        fixed_abbrev|       stopword_text|
+--------------------+--------+--------------------+--------------------+--------------------+
|How did you @Bill...|      []|How did you @Bill...|how did you @bill...|@billgates go dev...|
|RD 2 | Pick 41 - ...|    null|RD 2 | Pick 41 - ...|rd 2 | pick 41 - ...|rd 2 | pick 41 - ...|
|Nice local haul i...|    null|Nice local haul i...|nice local haul i...|nice local haul #...|
|      Dear @Raiders,|    null|      Dear @Raiders,|      dear @raiders,|      dear @raiders,|
|Garrett Taylor (@...|    null|Garrett Taylor (@...|garrett taylor (@...|garrett taylor (@...|
+--------------------+--------+--------------------+--------------------+--------------------+
only showing top 5 rows



### 4.3. Remove Irrelevant Features

Here, I'll remove various features from the such such as hyperlinks, mentions, short words, and punctuation.

In [0]:
def remove_features(data_str):
    # Compile regex
    url_re = re.compile('https?://(www.)?\w+\.\w+(/\w+)*/?')
    punc_re = re.compile('[%s]' % re.escape(string.punctuation))
    num_re = re.compile('(\\d+)')
    mention_re = re.compile('@(\w+)')
    alpha_num_re = re.compile("^[a-z0-9_.]+$")
    # Remove hyperlinks
    data_str = url_re.sub(' ', data_str)
    # Remove @mentions
    data_str = mention_re.sub(' ', data_str)
    # Remove puncuation
    data_str = punc_re.sub(' ', data_str)
    # Remove numeric 'words'
    data_str = num_re.sub(' ', data_str)
    # Remove non a-z 0-9 characters and words shorter than 3 characters
    list_pos = 0
    cleaned_str = ''
    for word in data_str.split():
        if list_pos == 0:
            if alpha_num_re.match(word) and len(word) > 2:
                cleaned_str = word
            else:
                cleaned_str = ' '
        else:
            if alpha_num_re.match(word) and len(word) > 2:
                cleaned_str = cleaned_str + ' ' + word
            else:
                cleaned_str += ' '
        list_pos += 1
    return cleaned_str

#Setup PySpark UDF Function
remove_features_udf = udf(remove_features, StringType())

In [23]:
rem_features_df = stopwordsdf.select(raw_cols+["stopword_text"]).withColumn("rem_feat_text", remove_features_udf(stopwordsdf["stopword_text"]))
rem_features_df.show(5)

+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|                text|hashtags|       text_non_asci|        fixed_abbrev|       stopword_text|       rem_feat_text|
+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|How did you @Bill...|      []|How did you @Bill...|how did you @bill...|@billgates go dev...|  developing code...|
|RD 2 | Pick 41 - ...|    null|RD 2 | Pick 41 - ...|rd 2 | pick 41 - ...|rd 2 | pick 41 - ...|  pick jonathan t...|
|Nice local haul i...|    null|Nice local haul i...|nice local haul i...|nice local haul #...|nice local haul n...|
|      Dear @Raiders,|    null|      Dear @Raiders,|      dear @raiders,|      dear @raiders,|                dear|
|Garrett Taylor (@...|    null|Garrett Taylor (@...|garrett taylor (@...|garrett taylor (@...|garrett taylor he...|
+--------------------+--------+--------------------+--------------------

### 4.4. Lemmatization

The goal of lemmmatization is to remove inflections and map a word to it's root form. 

In [0]:
def lemmatize(data_str):
    # expects a string
    list_pos = 0
    cleaned_str = ''
    lmtzr = WordNetLemmatizer()
    text = data_str.split()
    tagged_words = pos_tag(text)
    for word in tagged_words:
        if 'v' in word[1].lower():
            lemma = lmtzr.lemmatize(word[0], pos='v')
        else:
            lemma = lmtzr.lemmatize(word[0], pos='n')
        if list_pos == 0:
            cleaned_str = lemma
        else:
            cleaned_str = cleaned_str + ' ' + lemma
        list_pos += 1
    return cleaned_str

# Setup PySpark UDF Function
lemmatize_udf = udf(lemmatize, StringType())

In [25]:
lemm_df = rem_features_df.select(raw_cols+["rem_feat_text"]).withColumn("cleaned_text", lemmatize_udf(rem_features_df["rem_feat_text"]))
lemm_df.show(5)

+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|                text|hashtags|       text_non_asci|        fixed_abbrev|       rem_feat_text|        cleaned_text|
+--------------------+--------+--------------------+--------------------+--------------------+--------------------+
|How did you @Bill...|      []|How did you @Bill...|how did you @bill...|  developing code...|develop code comp...|
|RD 2 | Pick 41 - ...|    null|RD 2 | Pick 41 - ...|rd 2 | pick 41 - ...|  pick jonathan t...|pick jonathan tay...|
|Nice local haul i...|    null|Nice local haul i...|nice local haul i...|nice local haul n...|nice local haul n...|
|      Dear @Raiders,|    null|      Dear @Raiders,|      dear @raiders,|                dear|                dear|
|Garrett Taylor (@...|    null|Garrett Taylor (@...|garrett taylor (@...|garrett taylor he...|garrett taylor he...|
+--------------------+--------+--------------------+--------------------

### 4.5. Create Cleaned DataFrame

In [26]:
data = lemm_df.select('cleaned_text','hashtags')
data.show(5)

+--------------------+--------+
|        cleaned_text|hashtags|
+--------------------+--------+
|develop code comp...|      []|
|pick jonathan tay...|    null|
|nice local haul n...|    null|
|                dear|    null|
|garrett taylor he...|    null|
+--------------------+--------+
only showing top 5 rows



# 5. Sentiment Analysis

Sentiment Analysis is the process of 'computationally' determining whether a piece of writing is positive, negative, or neutral in order to try and determine the attitude of the writer. It's widely used by businesses for analyzing customer reviews, social media, and survey responses.

### 5.1. Creating the Sentiment Analysis Function

In [0]:
from pyspark.sql.types import FloatType
from textblob import TextBlob

def sentiment_analysis(text):
    return TextBlob(text).sentiment.polarity

# Setup PySpark UDF Function
sentiment_analysis_udf = udf(sentiment_analysis , FloatType())

In [28]:
data = data.withColumn("sentiment_score", sentiment_analysis_udf(data['cleaned_text']))
data.show(5)

+--------------------+--------+---------------+
|        cleaned_text|hashtags|sentiment_score|
+--------------------+--------+---------------+
|develop code comp...|      []|            0.0|
|pick jonathan tay...|    null|            0.0|
|nice local haul n...|    null|            0.3|
|                dear|    null|            0.0|
|garrett taylor he...|    null|            0.0|
+--------------------+--------+---------------+
only showing top 5 rows



### 5.2. Sentiment Analysis

In [0]:
def condition(r):
    if (r >=0.1):
        label = "positive"
    elif(r <= -0.1):
        label = "negative"
    else:
        label = "neutral"
    return label

# Setup PySpark UDF Function
sentiment_udf = udf(lambda x: condition(x), StringType())

In [30]:
data = data.withColumn("sentiment", sentiment_udf(data['sentiment_score']))
data.show(5)

+--------------------+--------+---------------+---------+
|        cleaned_text|hashtags|sentiment_score|sentiment|
+--------------------+--------+---------------+---------+
|develop code comp...|      []|            0.0|  neutral|
|pick jonathan tay...|    null|            0.0|  neutral|
|nice local haul n...|    null|            0.3| positive|
|                dear|    null|            0.0|  neutral|
|garrett taylor he...|    null|            0.0|  neutral|
+--------------------+--------+---------------+---------+
only showing top 5 rows



In [31]:
data.groupBy("sentiment").count().orderBy("sentiment").show()

+---------+-----+
|sentiment|count|
+---------+-----+
| negative|  171|
|  neutral| 1238|
| positive|  748|
+---------+-----+



In [32]:
# Total cleaned dataset length
print(data.count())

# Percentages
print(171/2157)
print(1238/2157)
print(748/2157)

2157
0.07927677329624479
0.573945294390357
0.34677793231339826


In [0]:
data.write.csv('CleanedTweets.csv')

In [43]:
!head -1000 data >> CleanedTweets

head: cannot open 'data' for reading: No such file or directory


**Results:** The first impression of these results is that the data cleaning process negated a ton of our initial scraped dataset. This could be due to there being a lot of null values, duplicates, or incorrectly formatted tweet data during the scraping process. The percentage breakdown for each of the sentiment categories within the dataset are as follows:

- **Negative: 7.93%**
- **Neutral: 57.39%**
- **Positive: 34.68%**

There's a heavy skew towards neutral sentiment, however, the positive tweets greatly outnumber the negative tweets. It is safe to say that overall, the NFL fanbase's sentiment towards this uncommon draft structure was positive.

### 5.4. Analysis by Team

While utilizing only the teams hashtags contained in this dataset may not be the most ideal way to view each individual teams fans' sentiment, I'm curious to see the results in ours. I will be viewing sentiment towards the Green Bay Packers, Arizona Cardinals, and Kansas City Chiefs. The Packers had the worst draft in terms of consensus by analysts around the industry, while the Cardinals had one of the better drafts. The Chiefs are the returning superbowl champions, and have a relatively neutral consensus draft grade amongst analysts.

In [0]:
# Finding distinct values in hashtag column (returns a large amount)
# hashtags = [i.hashtags for i in data.select('hashtags').distinct().collect()]

#### 5.4.1. Green Bay Packers

In [33]:
# Packers sentiment
data.filter(data.hashtags.contains('Packers')).groupBy("sentiment").count().orderBy("sentiment").show()

+---------+-----+
|sentiment|count|
+---------+-----+
| negative|    4|
|  neutral|    8|
| positive|    4|
+---------+-----+



#### 5.4.2. Arizona Cardinals

In [34]:
# Cardinals sentiment
data.filter(data.hashtags.contains('Cardinals')).groupBy("sentiment").count().orderBy("sentiment").show()

+---------+-----+
|sentiment|count|
+---------+-----+
|  neutral|    7|
| positive|    2|
+---------+-----+



#### 5.4.3. Kansas City Chiefs

In [35]:
# Chiefs sentiment
data.filter(data.hashtags.contains('Chiefs')).groupBy("sentiment").count().orderBy("sentiment").show()

+---------+-----+
|sentiment|count|
+---------+-----+
| negative|    2|
|  neutral|    4|
| positive|    4|
+---------+-----+



Again, the analysis suffers from a small sample size. An analysis for each team would likely be stronger if you scraped only keywords or hashtags that are team-specific.

# 6. Naive Bayes Text Classification

Naive Bayes classifiers are a collection of classification algorithms based on **Bayes' Theorem**. This is a family of algorithms where every pair of features being classified is independent of each other. PySpark MLlib supports multinomial naive Bayes and Bernoulli naive Bayes. These models are typically used for document classification.

In probability theory and statistics, **Bayes' Theorem** decribes the probability of an event based on prior knowledge of conditions that might be related to the event. Bayes' Theorem is mathematically stated as the equation below. 

![bayes.jpg](attachment:bayes.jpg)

Where A and B are events and P(B) ? 0.
- Trying to find probability of event A, given the event B is true. Event B is also termed as evidence.
- P(A) is the prior probability of A. The evidence is an attribute value of an unknown instance(here, it is event B).
- P(A|B) is a probability of event after evidence is seen of B.

### 6.1. Create DataFrame for Naive Bayes Classification

Due to some Java Heap Memory error complications, I will be performing this on a subset of the overall data (1000 rows).

In [0]:
tweetsubdf = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('NFLDraftTweets_small.csv')

In [53]:
# Create newtweetdf with text and hashtag columns
NBdf = tweetsubdf.drop('username', 'acctdesc', 'location', 'following', 'followers', 'totaltweets', 'usercreatedts', 'tweetcreatedts', 'retweetcount')

# Drop duplicate rows
NBdf = NBdf.dropDuplicates()

# Drop rows with only null values
NBdf = NBdf.dropna(how='all')

# Remove non-ASCII characters
NBdf = NBdf.withColumn('text_non_asci',strip_non_ascii_udf(NBdf['text']))

# Fix abbreviations
NBdf = NBdf.withColumn('fixed_abbrev',fix_abbreviation_udf(NBdf['text_non_asci']))

#Get raw columns
raw_cols = NBdf.columns

# Remove stopwords
NBdf = NBdf.select(raw_cols).withColumn("stopword_text", remove_stops_udf(NBdf["fixed_abbrev"]))

# Remove irrelevant features
NBdf = NBdf.select(raw_cols+["stopword_text"]).withColumn("rem_feat_text", remove_features_udf(NBdf["stopword_text"]))

# Lemmatization
NBdf = NBdf.select(raw_cols+["rem_feat_text"]).withColumn("cleaned_text", lemmatize_udf(NBdf["rem_feat_text"]))

# DF for Sentiment analysis
NBdf = NBdf.select('cleaned_text','hashtags')
NBdf.show(1)

+--------------------+--------+
|        cleaned_text|hashtags|
+--------------------+--------+
|post nfldraft rav...|    null|
+--------------------+--------+
only showing top 1 row



In [56]:
# Sentiment analysis on 1000 tweet dataset
NBdf = NBdf.withColumn("sentiment_score", sentiment_analysis_udf(NBdf['cleaned_text']))
NBdf.show(1)

+--------------------+--------+---------------+
|        cleaned_text|hashtags|sentiment_score|
+--------------------+--------+---------------+
|post nfldraft rav...|    null|            0.0|
+--------------------+--------+---------------+
only showing top 1 row



In [57]:
# Create DataFrame for Naive Bayes
NBdf = NBdf.selectExpr("cleaned_text as text", "sentiment_score as label")
NBdf.show(1)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|post nfldraft rav...|  0.0|
+--------------------+-----+
only showing top 1 row



In [58]:
# Add unique ID
from pyspark.sql.functions import monotonically_increasing_id

NBdf = NBdf.withColumn("uid", monotonically_increasing_id())
NBdf = NBdf.select('uid', 'text', 'label')
NBdf.show(5)

+-----------+--------------------+-----+
|        uid|                text|label|
+-----------+--------------------+-----+
| 8589934592|post nfldraft rav...|  0.0|
| 8589934593|check ezekiel ell...|  0.0|
|17179869184|          tiger king|  0.0|
|17179869185|cowboy nfldraft g...|  0.2|
|17179869186|want make life ea...|  0.0|
+-----------+--------------------+-----+
only showing top 5 rows



### 6.2. Splitting Data Into Training and Test Sets

In [0]:
# Split the data into training and test sets (60% / 40% split)
(trainingData, testData) = NBdf.randomSplit([0.6, 0.4])

### 6.3. Training Model

In [0]:
# Import PySpark Machine Learning Packages
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.feature import CountVectorizer

In [0]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and nb.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures")

# vectorizer = CountVectorizer(inputCol= "words", outputCol="rawFeatures")
idf = IDF(minDocFreq=3, inputCol="rawFeatures", outputCol="features")

# Naive Bayes model
nb = NaiveBayes()

# Pipeline Architecture
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, nb])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

### 6.4. Make Predictions

In [62]:
# Make predictions
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("text", "label", "prediction").show(5)

+--------------------+-----+----------+
|                text|label|prediction|
+--------------------+-----+----------+
|post nfldraft rav...|  0.0|      12.0|
|          tiger king|  0.0|      12.0|
|cowboy nfldraft g...|  0.2|      12.0|
|total sec nfldraf...|-0.05|      12.0|
|congrats caricspo...|0.275|      12.0|
+--------------------+-----+----------+
only showing top 5 rows



### 6.5. Model Evaluation

In [63]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.0

The Naive Bayes model strangely returned an accuracy of 0%. Let's try a different model.

# 7. HashingTF + IDF + Logistic Regression Model

Here, I will try to implement a TFIDF model with logistic regression.

### 7.1. Creating DataFrame for Classification

In [65]:
modelData = NBdf.selectExpr("text as text", "label as target")
modelData.show(1)

+--------------------+------+
|                text|target|
+--------------------+------+
|post nfldraft rav...|   0.0|
+--------------------+------+
only showing top 1 row



In [66]:
# Add unique ID
from pyspark.sql.functions import monotonically_increasing_id

modelData = modelData.withColumn("uid", monotonically_increasing_id())
modelData = modelData.select('uid', 'text', 'target')
modelData.show(1)

+----------+--------------------+------+
|       uid|                text|target|
+----------+--------------------+------+
|8589934592|post nfldraft rav...|   0.0|
+----------+--------------------+------+
only showing top 1 row



### 7.2. Splitting Data Into Training and Test Sets

In [0]:
(train_set, val_set, test_set) = modelData.randomSplit([0.98, 0.01, 0.01], seed = 2000)

### 7.3. Training Model

In [0]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [69]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=3) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+-----------+--------------------+------+--------------------+--------------------+--------------------+-----+
|        uid|                text|target|               words|                  tf|            features|label|
+-----------+--------------------+------+--------------------+--------------------+--------------------+-----+
| 8589934592|post nfldraft rav...|   0.0|[post, nfldraft, ...|(65536,[12929,232...|(65536,[12929,232...|  0.0|
| 8589934593|check ezekiel ell...|   0.0|[check, ezekiel, ...|(65536,[235,5630,...|(65536,[235,5630,...|  0.0|
|17179869184|          tiger king|   0.0|       [tiger, king]|(65536,[10302,326...|(65536,[10302,326...|  0.0|
|17179869185|cowboy nfldraft g...|   0.2|[cowboy, nfldraft...|(65536,[4211,8062...|(65536,[4211,8062...|  6.0|
|17179869186|want make life ea...|   0.0|[want, make, life...|(65536,[4882,6052...|(65536,[4882,6052...|  0.0|
+-----------+--------------------+------+--------------------+--------------------+--------------------+-----+
o

In [70]:
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

KeyboardInterrupt: ignored

### 7.4. Model Evaluation

In [71]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

Py4JJavaError: ignored

For binary classification, Spark doesn't support accuracy as a metric. I can calculate accuracy by counting the number of predictions matching the label and dividing it by the total entries.

In [0]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy

# References

**Content Sources:**
- https://www.techopedia.com/definition/29019/cloud-server
- https://www.pythonanywhere.com
- http://docs.tweepy.org/en/latest/
- https://www.geeksforgeeks.org/twitter-sentiment-analysis-using-python/
- http://adilmoujahid.com/posts/2014/07/twitter-analytics/
- https://stackoverflow.com/questions/24214189/how-can-i-get-tweets-older-than-a-week-using-tweepy-or-other-python-libraries
- http://spark.apache.org/docs/latest/#launching-on-a-cluster
- https://docs.databricks.com/data/data-sources/read-csv.html
- https://towardsdatascience.com/creating-the-twitter-sentiment-analysis-program-in-python-with-naive-bayes-classification-672e5589a7ed
- https://towardsdatascience.com/sentiment-analysis-with-pyspark-bc8e83f80c35
- https://medium.com/@leowgriffin/scraping-tweets-with-tweepy-python-59413046e788
- https://www.kdnuggets.com/2019/04/text-preprocessing-nlp-machine-learning.html
- https://runawayhorse001.github.io/LearningApacheSpark/textmining.html
- https://www.nltk.org/
- https://classes.ischool.syr.edu/ist718/content/unit09/lab-sentiment_analysis/
- http://www.datasciencemadesimple.com/subset-or-filter-data-with-multiple-conditions-in-pyspark/
- https://spark.apache.org/docs/2.4.5/
- http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.NaiveBayes
- Prior notebooks created for homeworks, lecture notes, other classes' materials

**Photo sources:**
- http://spark.apache.org/
- https://realpython.com/twitter-bot-python-tweepy/
- https://oh42fifty.org/wp-content/uploads/2020/05/Winners_and_Losers_of_the_2020_NFL_Draft.jpg
- https://runawayhorse001.github.io/LearningApacheSpark/_images/sentiment_analysis_pipeline.png
- https://www.geeksforgeeks.org/wp-content/ql-cache/quicklatex.com-7777aa719ea14857115695676adc0914_l3.svg