#### Running the notebook
To run this notebook you must install the following:  
       -  pip install pyspark  
       -  pip install re  
       -  pip install os   
       -  pip install pyspellchecker  
       -  pip install nltk  
       
       
    

### Section 1: Process and Format New Tweet Data

The goal of this section is to process the data retrieved by scraping Twitter using their developer API, and match the formatting of the tweet dataset used in the paper.


#### 1.1: Formatting New DataFrames
There are 3 columns considered to be relevant for this analysis: airline_sentiment, airline and text. The raw csv files will be formatted to match these columns.


In [1]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import isnan, when, count, col, udf,lower, regexp_replace, split, concat, lit
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as f

spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "15g") \
    .appName('my-cool-app') \
    .getOrCreate()

In [2]:
##import packages
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
import os


##Load Data and Create Dataframes
def load_csv(airline):
  #load csv
    cwd = os.getcwd()
    file_location = cwd+"/"+airline+"_labelled.csv"  
    df = spark.read.format("csv").option("header", False).option("multiLine", True).option("escape", "\"").load(file_location)

    #basic formatting of new pyspark dataframes
    df = df.drop('_c0')
    df = df.withColumnRenamed('_c1', 'raw_text')
    df = df.withColumnRenamed('_c2', 'airline_sentiment')
    return df

#create dataframes, adding new column with airline name
df_virgin_raw = load_csv("virgin").withColumn("airline",lit("Virgin America"))
df_delta_raw = load_csv("delta").withColumn("airline",lit("Delta"))
df_american_raw = load_csv("american_airline").withColumn("airline",lit("American"))
df_southwest_raw = load_csv("southwest").withColumn("airline",lit("Southwest"))
df_united_raw = load_csv("unitedairlines").withColumn("airline",lit("United"))
df_us_airways_raw = load_csv("us_airways").withColumn("airline",lit("US Airways"))

display(df_virgin_raw)

DataFrame[raw_text: string, airline_sentiment: string, airline: string]

#### 1.2: Removing UTF-encoding and html links from raw text

In [3]:
#use regex to clean up raw text
from pyspark.sql.functions import *
import re

##create a regex-based UDF
def regex_cleanup(text):
    #remove UTF-8 codes from text
    text = re.sub(r'\\x[0-9][0-9]',' ',text) 
    text = re.sub(r'\\x[a-z][a-z]',' ',text)
    text = re.sub(r'\\x[a-z][0-9]',' ',text)
    text = re.sub(r'\\x[0-9][a-z]',' ',text)

    text = re.sub(r"b'RT",' ',text) ##remove retweet(RT) characters
    text = re.sub(r'b"RT',' ',text) ##remove retweet(RT) characters
    text = re.sub(r"b'",' ',text) ##remove "b" bytestring characters
    text = re.sub(r'b"',' ',text) ##remove "b" bytestring characters
    text = re.sub(r"\\n+",' ',text) ##remove new line character
    text = re.sub(r"http\S+", "", text) #remove twitter https links
    return text

##convert this function to pyspark UDF
regex_cleanup_UDF = udf(lambda x: regex_cleanup(x))

##apply regex_cleanup UDF to dataframe
def cleanup_text(df):
    df = df.withColumn("text",regex_cleanup_UDF("raw_text"))
    df = df.drop("raw_text")
    return df.dropDuplicates((['text'])) #delete duplicate rows that may occur due to retweets

In [4]:
##Call cleanup function on all dataframes
df_virgin = cleanup_text(df_virgin_raw)
df_delta = cleanup_text(df_delta_raw)
df_american = cleanup_text(df_american_raw)
df_southwest = cleanup_text(df_southwest_raw)
df_united = cleanup_text(df_united_raw)
df_us_airways = cleanup_text(df_us_airways_raw)

##combine all airline dataframes into one
temp = df_virgin.union(df_delta)
temp = temp.union(df_american)
temp = temp.union(df_southwest)
temp = temp.union(df_united)
new_data_df = temp.union(df_us_airways) #final df

display(new_data_df) 

DataFrame[airline_sentiment: string, airline: string, text: string]

In [5]:
def remove_space(text):
    text = re.sub(r'\t','',text) 
    text = re.sub(r'\s+','',text) 
    return text

##convert this function to pyspark UDF
remove_space_UDF = udf(lambda x: remove_space(x))

##apply regex_cleanup UDF to dataframe
def removing_spaces(df):
    df = df.withColumn("airline_sentiment",remove_space_UDF("airline_sentiment"))
    return df

new_data_df = removing_spaces(new_data_df)
new_data_df.groupBy("airline_sentiment").count().show()

+-----------------+-----+
|airline_sentiment|count|
+-----------------+-----+
|         positive|  127|
|          neutral|  245|
|         negative|  225|
+-----------------+-----+



In [6]:
##Load tweet data and drop irrelevant columns
def load_tweets(file_location):
  #load csv
    df = spark.read.format("csv").option("header", True).option("multiLine", True).option("escape", "\"").load(file_location)

    #basic formatting of new pyspark dataframes
    drop_list = ['tweet_id','airline_sentiment_confidence', 'negativereason', 'negativereason_confidence','airline_sentiment_gold', 'name', 'negativereason_gold', 'retweet_count', 'tweet_coord','tweet_created', 'tweet_location', 'user_timezone' ]
    df = df.drop(*drop_list)

    return df

##load tweet dataset
df_tweets = load_tweets( "Tweets.csv")
display(df_tweets)

##Combine dataframe for new data with dataframe for old data to create 1 dataframe
df_final = df_tweets.union(new_data_df)
display(df_final)

DataFrame[airline_sentiment: string, airline: string, text: string]

DataFrame[airline_sentiment: string, airline: string, text: string]

In [7]:
df_final.show(10)

+-----------------+--------------+--------------------+
|airline_sentiment|       airline|                text|
+-----------------+--------------+--------------------+
|          neutral|Virgin America|@VirginAmerica Wh...|
|         positive|Virgin America|@VirginAmerica pl...|
|          neutral|Virgin America|@VirginAmerica I ...|
|         negative|Virgin America|@VirginAmerica it...|
|         negative|Virgin America|@VirginAmerica an...|
|         negative|Virgin America|@VirginAmerica se...|
|         positive|Virgin America|@VirginAmerica ye...|
|          neutral|Virgin America|@VirginAmerica Re...|
|         positive|Virgin America|@virginamerica We...|
|         positive|Virgin America|@VirginAmerica it...|
+-----------------+--------------+--------------------+
only showing top 10 rows



#### 1.3: Briefly Investigate our Data

In this section we will briefly explore our dataset. We want to investigate:
- How any original datapoints did we have?
- How many new datapoints did we add?
- How many total datapoints do we have now?
- How many of each sentiment do we have?

In [8]:
def sentimentCounter(df):
    negative = 0
    positive = 0
    neutral = 0

    for row in df.rdd.collect():
        if "negative" in row['airline_sentiment']: 
            negative = negative + 1
        elif "neutral" in row['airline_sentiment']: 
            neutral = neutral + 1
        elif "positive" in row['airline_sentiment']: 
            positive = positive + 1
  
    return negative, neutral, positive

In [9]:
#Check: How many data points did we have, how many did we add?
print("Number of new data points: ",new_data_df.count())
print("Number of original data points: ",df_tweets.count())
print("Total number of data points: ",df_final.count())
print("\n")

#Check: Count and percentage of each sentiment
negative, neutral, positive = sentimentCounter(df_final)
total = negative + positive + neutral
print("Number of positives: ",positive,"(%.2f"%(positive/total*100),"%)")
print("Number of negatives: ",negative,"(%.2f"%(negative/total*100),"%)")
print("Number of neutrals: ",neutral,"(%.2f"%(neutral/total*100),"%)")


Number of new data points:  597
Number of original data points:  14640
Total number of data points:  15237


Number of positives:  2490 (16.34 %)
Number of negatives:  9403 (61.71 %)
Number of neutrals:  3344 (21.95 %)


###Section 2: Preprocessing Data
The goal of this section is to clean the text by:
  1. Converting text to lower-case
  2. Removing any noise
  3. Tokenizing words
  4. Spell check words
  5. Removing stop words
  6. Stemming words

In [10]:
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize
from pyspark.sql import Row

from pyspark.sql.functions import udf, lower, regexp_replace, col
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
import pyspark.sql.functions as f
from spellchecker import SpellChecker
from pyspark.sql.types import ArrayType, StringType

In [11]:
from pyspark.sql.functions import lower, regexp_replace, col

def clean_col(col): 
  #remove noise, any @s
  col = re.sub(r"@[A-z]*", "",col) 
  #remove noise, more than one space
  col= re.sub(r" {2,}", " ",col) 
  #remove noise, any numbers
  col = re.sub(r"\d", "",col)
  #remove punctuation
  col = re.sub(r"[!'#$%&'()*+,\-./:;<=>?@\[/\]^_{|}~\"]", "",col)
  #convert tweet to lower-case
  col = col.lower()
  
  return col

clean_col_UDF = udf(lambda x: clean_col(x))
df_original_regex = df_tweets.withColumn('preText1', clean_col_UDF(col('text')))
df_new_regex = new_data_df.withColumn('preText1', clean_col_UDF(col('text')))
df_final_regex = df_final.withColumn('preText1', clean_col_UDF(col('text')))

In [12]:
from pyspark.ml.feature import RegexTokenizer
from nltk.tokenize import word_tokenize

def token_df(df):
  token_udf = udf(lambda words: word_tokenize(words), ArrayType(StringType()))
  df = df.withColumn("words", token_udf(col("preText1")))
  return df

df_token = token_df(df_final_regex)
df_token_original = token_df(df_original_regex)
df_token_new = token_df(df_new_regex)
display(df_token)

DataFrame[airline_sentiment: string, airline: string, text: string, preText1: string, words: array<string>]

In [13]:
from pyspark.ml.feature import StopWordsRemover

def stop_words_remove_df(df):
    remover = StopWordsRemover()
    remover.setInputCol("words")
    remover.setOutputCol("no_stop")
    df = remover.transform(df)

    return df

df_stop = stop_words_remove_df(df_token)
df_new_stop = stop_words_remove_df(df_token_new)
df_original_stop = stop_words_remove_df(df_token_original)

display(df_stop)

DataFrame[airline_sentiment: string, airline: string, text: string, preText1: string, words: array<string>, no_stop: array<string>]

In [14]:
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.types import ArrayType, StringType

def stem_words(sentence):
    stemmer = SnowballStemmer("english")
    reArr = []
    for word in sentence:
        stem_word = stemmer.stem(word)
        if len(stem_word) > 2:
            reArr.append(stem_word)

    return reArr

def stem_col(df):
    stemmer_udf = udf(lambda words: stem_words(words), ArrayType(StringType()))
    col_stem = stemmer_udf(col("no_stop"))
    df = df.withColumn("text_final", col_stem.cast(ArrayType(StringType())))

    return df

df_final_pre = stem_col(df_stop)
df_new_pre = stem_col(df_new_stop)
df_original_pre = stem_col(df_original_stop)

In [15]:
df_final_pre = df_final_pre.select("airline_sentiment", "airline", "text_final")
df_new_pre = df_new_pre.select("airline_sentiment", "airline", "text_final")
df_original_pre = df_original_pre.select("airline_sentiment", "airline", "text_final")



In [16]:
df_final_pre.show(10)

+-----------------+--------------+--------------------+
|airline_sentiment|       airline|          text_final|
+-----------------+--------------+--------------------+
|          neutral|Virgin America|              [said]|
|         positive|Virgin America|[plus, youv, comm...|
|          neutral|Virgin America|[didnt, today, mu...|
|         negative|Virgin America|[realli, aggress,...|
|         negative|Virgin America|[realli, big, bad...|
|         negative|Virgin America|[serious, pay, fl...|
|         positive|Virgin America|[yes, near, everi...|
|          neutral|Virgin America|[realli, miss, pr...|
|         positive|Virgin America|   [well, didnt…but]|
|         positive|Virgin America|[amaz, arriv, hou...|
+-----------------+--------------+--------------------+
only showing top 10 rows



## Section 3
The goal of this section is to apply machine learning to get the tweet sentiment by:  
1. Using HashingTF to create the features  
2. Applyiing idf to the features
3. Creating a pipeline  
4. Creating a parameter grid to tune the hyperparameters
5. Training the data
6. Applying Logistic regression and Random Forest
7. Comparing our scores

In [17]:
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [18]:
#using HashingTF to create features
#apply IDF to features
#pipeline used to execute all these steps at once
def create_IDF_vector(df):
    hashtf = HashingTF(numFeatures=1000, inputCol="text_final", outputCol='tf')
    idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) 
    label_stringIdx = StringIndexer(inputCol = "airline_sentiment", outputCol = "label")
    
    pipeline = Pipeline(stages=[hashtf, idf, label_stringIdx])
    pipeline_model = pipeline.fit(df)
    pipeline_df = pipeline_model.transform(df)
    
    return pipeline_df

In [19]:
#train model and return predictions
def train_model(df,model):
    (train_set,test_set) = df.randomSplit([0.80, 0.20],seed = 2000)
    trained_model = model.fit(train_set)
    predictions = trained_model.transform(test_set)

    return predictions

In [20]:
#create instances of ML models
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
lr = LogisticRegression(featuresCol="features", labelCol="label")

#create vectors
vector_df = create_IDF_vector(df_final_pre)
vector_df_new = create_IDF_vector(df_new_pre)
vector_df_original = create_IDF_vector(df_original_pre)

#get predictions
rf_predictions = train_model(vector_df,rf)
lr_predictions = train_model(vector_df,lr)
rf_predictions_new = train_model(vector_df_new, rf)
lr_predictions_new = train_model(vector_df_new, lr)
rf_predictions_original = train_model(vector_df_original, rf)
lr_predictions_original = train_model(vector_df_original, lr)


In [21]:
def get_classification_metrics(predictions):
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
    f1_score = evaluator.evaluate(predictions)
    
    print("f1 score: ",f1_score)

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
    precision = evaluator.evaluate(predictions)
    print("Precision: ",precision)

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
    recall = evaluator.evaluate(predictions)
    print("Recall: ",recall)

In [22]:
print("***** Logistic Regression Metrics For All Data*****")
get_classification_metrics(lr_predictions)
print("\n\n")
print("***** Random Forest Metrics For all Data*****")
get_classification_metrics(rf_predictions)

***** Logistic Regression Metrics For All Data*****
f1 score:  0.7210887831987325
Precision:  0.7178907590834822
Recall:  0.7256874580818243



***** Random Forest Metrics For all Data*****
f1 score:  0.47795888785908264
Precision:  0.38780064693999006
Recall:  0.6227364185110664


In [23]:
print("***** Logistic Regression Metrics For New Data*****")
get_classification_metrics(lr_predictions_new)
print("\n\n")
print("***** Random Forest Metrics For New Data*****")
get_classification_metrics(rf_predictions_new)

***** Logistic Regression Metrics For New Data*****
f1 score:  0.49374416433239965
Precision:  0.5138726033048878
Recall:  0.49206349206349204



***** Random Forest Metrics For New Data*****
f1 score:  0.43885390843231786
Precision:  0.626732174351222
Recall:  0.4682539682539682


In [24]:
print("***** Logistic Regression Metrics For Original Data*****")
get_classification_metrics(lr_predictions_original)
print("\n\n")
print("***** Random Forest Metrics For Original Data*****")
get_classification_metrics(rf_predictions_original)

***** Logistic Regression Metrics For Original Data*****
f1 score:  0.7230469792435031
Precision:  0.7196853866055846
Recall:  0.7279411764705882



***** Random Forest Metrics For Original Data*****
f1 score:  0.4881821341675356
Precision:  0.3981009070294784
Recall:  0.6309523809523809


## Grid Searches


In [25]:
(train_set,test_set) = vector_df.randomSplit([0.80, 0.20],seed = 2000)
(train_set_new,test_set_new) = vector_df_new.randomSplit([0.80, 0.20],seed = 2000)
(train_set_original,test_set_original) = vector_df_original.randomSplit([0.80, 0.20],seed = 2000)
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
lr = LogisticRegression(featuresCol="features", labelCol="label")

In [26]:
#do grid search for LogisticRegression
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01,0.1,1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3) 

cvModel_lr = crossval.fit(train_set)
cvModel_lr_new = crossval.fit(train_set_new)
cvModel_lr_original = crossval.fit(train_set_original)

In [27]:
#do grid search for RandomForestClassifier
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [1,3,5,10])
             .addGrid(rf.maxDepth, [3,5,7,10])
             .build())

crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3) 

cvModel_rf = crossval.fit(train_set)
cvModel_rf_new = crossval.fit(train_set_new)
cvModel_rf_original = crossval.fit(train_set_original)

In [28]:
best_model_lr = cvModel_lr.bestModel
best_model_rf = cvModel_rf.bestModel

print("********** Best parameters for Logistic Regression for all data **********")
print('Best regParam: ',best_model_lr.getRegParam())
print('Best maxIter: ',best_model_lr.getMaxIter())

print("\n\n********** Best parameters for Random Forest for all data **********")
print('Best number of trees: ',best_model_rf)
print('Best max depth: ',best_model_rf.getMaxDepth())

********** Best parameters for Logistic Regression for all data **********
Best regParam:  0.01
Best maxIter:  10


********** Best parameters for Random Forest for all data **********
Best number of trees:  RandomForestClassificationModel: uid=RandomForestClassifier_315be220be4b, numTrees=1, numClasses=3, numFeatures=1000
Best max depth:  10


In [29]:
best_LR_prediction = cvModel_lr.transform(test_set)
best_RF_prediction = cvModel_rf.transform(test_set)

In [30]:
print("***** Logistic Regression Metrics (Best Model) for all data *****")
get_classification_metrics(best_LR_prediction)
print("\n")
print("***** Random Forest Metrics (Best Model) for all data *****")
get_classification_metrics(best_RF_prediction)

***** Logistic Regression Metrics (Best Model) for all data *****
f1 score:  0.7173895174221085
Precision:  0.713303102710151
Recall:  0.7256874580818242


***** Random Forest Metrics (Best Model) for all data *****
f1 score:  0.5994190525967966
Precision:  0.6334523089472892
Recall:  0.6720321931589538


In [31]:
from pyspark.mllib.evaluation import MulticlassMetrics
def create_confusion_matrix(df):
    label_pred_new = df.select(['label', 'prediction'])
    metrics = MulticlassMetrics(label_pred_new.rdd.map(tuple))
    array = metrics.confusionMatrix().toArray()
    print(metrics.confusionMatrix().toArray())


In [32]:
#New data confusion matrix
create_confusion_matrix(lr_predictions_new)

[[26. 12.  5.]
 [20. 22.  9.]
 [13.  5. 14.]]


In [33]:
#Original data confusion matrix
create_confusion_matrix(lr_predictions_original)

[[1536.  237.  123.]
 [ 190.  252.   72.]
 [  76.   79.  291.]]


In [35]:
#All data confusion matrix
create_confusion_matrix(lr_predictions)

[[1579.  240.  132.]
 [ 193.  285.   80.]
 [  85.   88.  300.]]


In [36]:
#All data after tuning of hyperparameters
create_confusion_matrix(best_LR_prediction)

[[1607.  269.  143.]
 [ 172.  265.   77.]
 [  78.   79.  292.]]
