## CS 644 - Final Project 

##### Group Members :- Smeet Kathiria 

# Project Part 2 -> Data Processing and Model Training 

##### Setting up pyspark  environment 


In [None]:
import os
import sys

os.environ["SPARK_HOME"] = "/usr/spark2.4.3"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

##### loading all the required libraries

In [None]:
#Imports
from pyspark.sql import SparkSession,DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob
from pyspark.sql import SQLContext
import re
from bs4 import BeautifulSoup
from nltk.tokenize import WordPunctTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml import Pipeline, PipelineModel
import pandas as pd
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml.feature import StringIndexer
import seaborn as sns


##### Creating spark session

In [None]:
sc = SparkSession.builder.appName("modelbuild").getOrCreate()


##### Loading Twitter Training Data stored in Hadoop

In [None]:
TRAININGDATA_PATH = "/user/smeetp269618/twitter_training.csv"
df = sc.read.csv(
    TRAININGDATA_PATH,
    inferSchema=True)

In [None]:
print('Total Number of records in df : ',df.count())


Total Number of records in df :  1600000


In [None]:
df.show(5)

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
+---+----------+--------------------+--------+---------------+--------------------+
only showing top 5 rows



##### Renaming columns for better understanding

In [None]:
df = df.withColumnRenamed('_c0','sentiment').withColumnRenamed('_c5','text')
df.select('text','sentiment').schema

StructType(List(StructField(text,StringType,true),StructField(sentiment,IntegerType,true)))

##### Transofrming the tweets based on analysis conducted in the Twitter data analysis notebook 

In [None]:

tok = WordPunctTokenizer()

negations_catalog = {"isn't":"is not", "aren't":"are not", "wasn't":"was not", "weren't":"were not",
                "haven't":"have not","hasn't":"has not","hadn't":"had not","won't":"will not",
                "wouldn't":"would not", "don't":"do not", "doesn't":"does not","didn't":"did not",
                "can't":"can not","couldn't":"could not","shouldn't":"should not","mightn't":"might not",
                "mustn't":"must not"}

combined_regex = r'|'.join((r'@[A-Za-z0-9_]+', r'https?://[^ ]+'))

neg_regex = re.compile(r'\b(' + '|'.join(negations_catalog.keys()) + r')\b')

def tweets_transformer(line):
    text = line.text
    strip_text = re.sub(combined_regex, '', text)
    strip_text = re.sub(r'www.[^ ]+', '', strip_text)
    lower_case = strip_text.lower()
    neg_transform = neg_regex.sub(lambda x: negations_catalog[x.group()], lower_case)
    letters_filter = re.sub("[^a-zA-Z]", " ", neg_transform)
    normal_spaced = re.sub(' +',' ',letters_filter)
    return normal_spaced, line.sentiment


In [None]:
cleaned_rdd = df.rdd.map(tweets_transformer)
cleaned_df = cleaned_rdd.toDF(["text","sentiment"])


In [None]:
cleaned_df

DataFrame[text: string, sentiment: bigint]

In [None]:
cleaned_df.select('text','sentiment').show(5)

+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
| awww that s a bu...|        0|
|is upset that he ...|        0|
| i dived many tim...|        0|
|my whole body fee...|        0|
| no it s not beha...|        0|
+--------------------+---------+
only showing top 5 rows



In [None]:
cleaned_df.createOrReplaceTempView("cleandf")
final_df = sc.sql("SELECT * FROM cleandf WHERE LENGTH(text) > 50")
final_df.show()

+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
| awww that s a bu...|        0|
|is upset that he ...|        0|
| i dived many tim...|        0|
| no it s not beha...|        0|
| hey long time no...|        0|
| i could not bear...|        0|
| it it counts idk...|        0|
| i would ve been ...|        0|
| i wish i got to ...|        0|
|hollis death scen...|        0|
| ahh ive always w...|        0|
| oh dear were you...|        0|
| i was out most o...|        0|
|one of my friend ...|        0|
|just going to cry...|        0|
|ooooh lol that le...|        0|
|meh almost lover ...|        0|
|some hacked my ac...|        0|
| i want to go to ...|        0|
|thought sleeping ...|        0|
+--------------------+---------+
only showing top 20 rows



In [None]:
final_df.select('sentiment').distinct().show()

+---------+
|sentiment|
+---------+
|        0|
|        4|
+---------+



In [None]:
final_df.groupby('sentiment').count().show()

+---------+------+
|sentiment| count|
+---------+------+
|        0|483666|
|        4|451968|
+---------+------+



##### Changing all the values where sentiment is 4 to 1 just for consistency

In [None]:
def sentimentMapper(sentiment):
    return 1 if sentiment == 4 else sentiment

In [None]:
udfSentimentMapper = udf(sentimentMapper, IntegerType())
final_df = final_df.withColumn('sentiment', udfSentimentMapper('sentiment'))
final_df.show(5)

+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
| awww that s a bu...|        0|
|is upset that he ...|        0|
| i dived many tim...|        0|
| no it s not beha...|        0|
| hey long time no...|        0|
+--------------------+---------+
only showing top 5 rows



In [None]:
final_df.groupby('sentiment').count().show()

+---------+------+
|sentiment| count|
+---------+------+
|        1|451968|
|        0|483666|
+---------+------+



In [None]:
final_df.schema

StructType(List(StructField(text,StringType,true),StructField(sentiment,IntegerType,true)))

<font color='#065535'>Selecting text and sentiment columns for model training</font> 

In [None]:
final_df=final_df.select('text','sentiment')
final_df.show(5)

+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
| awww that s a bu...|        0|
|is upset that he ...|        0|
| i dived many tim...|        0|
| no it s not beha...|        0|
| hey long time no...|        0|
+--------------------+---------+
only showing top 5 rows



In [None]:
from pyspark.ml.feature import StopWordsRemover,Word2Vec,StringIndexer


##### Splitting the data into train, validation and test set

In [None]:
(train_set, val_set, test_set) = final_df.randomSplit([0.98, 0.01, 0.01], seed = 2000)


##### Building ML model using Tokenizer,ngrams, Count Vectorizer IDF,Vector Aassembler, label string indexer, Logistic regression. 

##### We used logistic regression since it performs better in classification tasks 


In [None]:
def ngrams_builder(inputCol=["text","sentiment"], n=3):
    
    tokenizer = [Tokenizer(inputCol="text", 
                           outputCol="words")]
    
    ngrams = [
        NGram(n=i, 
              inputCol="words", 
              outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    countVectorizer = [
        CountVectorizer(vocabSize=7260,
        inputCol="{0}_grams".format(i),
        outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    
    idf = [IDF(inputCol="{0}_tf".format(i), 
            outputCol="{0}_tfidf".format(i), 
            minDocFreq=5
              ) 
           for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    
    label_stringIndexer = [StringIndexer(inputCol = "sentiment", 
                                         outputCol = "label")]
    
    lr = [LogisticRegression(maxIter=100)]
    
    return Pipeline(stages = tokenizer + ngrams + countVectorizer + idf + assembler + label_stringIndexer + lr)

##### Doing training and predictions  

In [None]:
pipeline = ngrams_builder()

ngram_model = pipeline.fit(train_set)

predictions = ngram_model.transform(val_set)

accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())

print("Accuracy Score: {0:.4f}".format(accuracy))

Accuracy Score: 0.8037


##### Saving the model which we will use to predict on the live streaming data 

In [None]:
ngram_model.save("twitter_sentiment_model")

##### Checking some predictions 

In [None]:
predictions.select('text','prediction').show(5)

+--------------------+----------+
|                text|prediction|
+--------------------+----------+
| again i have no ...|       0.0|
| ahaha i m actual...|       1.0|
| ahh i knowww but...|       0.0|
| all s fine in do...|       0.0|
| ally thanks for ...|       0.0|
+--------------------+----------+
only showing top 5 rows

