In [1]:
import pyspark
from pyspark.sql.functions import col, count, when, isnull, udf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, FloatType, DoubleType, ArrayType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [2]:
import html

In [22]:
import emo_unicode
from emo_unicode import *

In [3]:
import nltk
from nltk.tokenize import RegexpTokenizer, sent_tokenize, word_tokenize
from nltk.corpus import stopwords, wordnet
from  nltk.stem import WordNetLemmatizer
from nltk.stem.porter import PorterStemmer
import re

lemmatizer = WordNetLemmatizer() # set lemmatizer
stemmer = PorterStemmer() # set stemmer
stopwords_list = stopwords.words('english')
allowedWordTypes = ["J","R","V","N"]

In [23]:
# title abbrevation dict

#source: https://www.kaggle.com/code/nmaguette/up-to-date-list-of-slangs-for-text-preprocessing
abbreviations = {
    "$" : " dollar ",
    "€" : " euro ",
    "4ao" : "for adults only",
    "a.m" : "before midday",
    "a3" : "anytime anywhere anyplace",
    "aamof" : "as a matter of fact",
    "acct" : "account",
    "adih" : "another day in hell",
    "afaic" : "as far as i am concerned",
    "afaict" : "as far as i can tell",
    "afaik" : "as far as i know",
    "afair" : "as far as i remember",
    "afk" : "away from keyboard",
    "app" : "application",
    "approx" : "approximately",
    "apps" : "applications",
    "asap" : "as soon as possible",
    "asl" : "age, sex, location",
    "atk" : "at the keyboard",
    "ave." : "avenue",
    "aymm" : "are you my mother",
    "ayor" : "at your own risk", 
    "b&b" : "bed and breakfast",
    "b+b" : "bed and breakfast",
    "b.c" : "before christ",
    "b2b" : "business to business",
    "b2c" : "business to customer",
    "b4" : "before",
    "b4n" : "bye for now",
    "b@u" : "back at you",
    "bae" : "before anyone else",
    "bah" : "frustration",
    "bak" : "back at keyboard",
    "bbbg" : "bye bye be good",
    "bbc" : "british broadcasting corporation",
    "bbias" : "be back in a second",
    "bbl" : "be back later",
    "bbs" : "be back soon",
    "be4" : "before",
    "bfn" : "bye for now",
    "blvd" : "boulevard",
    "boooo" : "displeasure",
    "bout" : "about",
    "brb" : "be right back",
    "bros" : "brothers",
    "brt" : "be right there",
    "bsaaw" : "big smile and a wink",
    "btw" : "by the way",
    "bwl" : "bursting with laughter",
    "c/o" : "care of",
    "cet" : "central european time",
    "cf" : "compare",
    "cia" : "central intelligence agency",
    "csl" : "can not stop laughing",
    "cu" : "see you",
    "cul8r" : "see you later",
    "cv" : "curriculum vitae",
    "cwot" : "complete waste of time",
    "cya" : "see you",
    "cyt" : "see you tomorrow",
    "dae" : "does anyone else",
    "dbmib" : "do not bother me i am busy",
    "diy" : "do it yourself",
    "dm" : "direct message",
    "dwh" : "during work hours",
    "e123" : "easy as one two three",
    "eet" : "eastern european time",
    "eg" : "example",
    "embm" : "early morning business meeting",
    "encl" : "enclosed",
    "encl." : "enclosed",
    "etc" : "and so on",
    "faq" : "frequently asked questions",
    "fawc" : "for anyone who cares",
    "fb" : "facebook",
    "fc" : "fingers crossed",
    "fig" : "figure",
    "fimh" : "forever in my heart", 
    "fml" : "fuck my life",
    "ft." : "feet",
    "ft" : "featuring",
    "ftl" : "for the loss",
    "ftw" : "for the win",
    "fwiw" : "for what it is worth",
    "fyi" : "for your information",
    "g9" : "genius",
    "gahoy" : "get a hold of yourself",
    "gal" : "get a life",
    "gcse" : "general certificate of secondary education",
    "gfn" : "gone for now",
    "gg" : "good game",
    "gl" : "good luck",
    "glhf" : "good luck have fun",
    "gmt" : "greenwich mean time",
    "gmta" : "great minds think alike",
    "gn" : "good night",
    "g.o.a.t" : "greatest of all time",
    "goat" : "greatest of all time",
    "goi" : "get over it",
    "gps" : "global positioning system",
    "gr8" : "great",
    "gratz" : "congratulations",
    "gyal" : "girl",
    "h&c" : "hot and cold",
    "hp" : "horsepower",
    "hr" : "hour",
    "hrh" : "his royal highness",
    "ht" : "height",
    "hun" : "honey",
    "ibrb" : "i will be right back",
    "ic" : "i see",
    "icq" : "i seek you",
    "icymi" : "in case you missed it",
    "idc" : "i do not care",
    "idgadf" : "i do not give a damn fuck",
    "idgaf" : "i do not give a fuck",
    "idk" : "i do not know",
    "ie" : "that is",
    "i.e" : "that is",
    "ifyp" : "i feel your pain",
    "IG" : "instagram",
    "iirc" : "if i remember correctly",
    "ilu" : "i love you",
    "ily" : "i love you",
    "imho" : "in my humble opinion",
    "imo" : "in my opinion",
    "imu" : "i miss you",
    "iow" : "in other words",
    "irl" : "in real life",
    "j4f" : "just for fun",
    "jic" : "just in case",
    "jk" : "just kidding",
    "jsyk" : "just so you know",
    "l8r" : "later",
    "lb" : "pound",
    "lbs" : "pounds",
    "ldr" : "long distance relationship",
    "lmao" : "laugh my ass off",
    "lmfao" : "laugh my fucking ass off",
    "lol" : "laughing out loud",
    "ltd" : "limited",
    "ltns" : "long time no see",
    "m8" : "mate",
    "mf" : "motherfucker",
    "mfs" : "motherfuckers",
    "mfw" : "my face when",
    "mofo" : "motherfucker",
    "mph" : "miles per hour",
    "mr" : "mister",
    "mrw" : "my reaction when",
    "ms" : "miss",
    "mte" : "my thoughts exactly",
    "nagi" : "not a good idea",
    "nbc" : "national broadcasting company",
    "nbd" : "not big deal",
    "nfs" : "not for sale",
    "ngl" : "not going to lie",
    "nhs" : "national health service",
    "nrn" : "no reply necessary",
    "nsfl" : "not safe for life",
    "nsfw" : "not safe for work",
    "nth" : "nice to have",
    "nvr" : "never",
    "nyc" : "new york city",
    "oc" : "original content",
    "og" : "original",
    "ohp" : "overhead projector",
    "oic" : "oh i see",
    "omdb" : "over my dead body",
    "omg" : "oh my god",
    "omw" : "on my way",
    "ott" : "over the top",
    "p.a" : "per annum",
    "p.m" : "after midday",
    "pedos" : "pedophile",
    "perma" : "permanent",
    "pm" : "prime minister",
    "poc" : "people of color",
    "pov" : "point of view",
    "pp" : "pages",
    "ppl" : "people",
    "prw" : "parents are watching",
    "ps" : "postscript",
    "pmsl" : "pissing myself laughing",
    "pt" : "point",
    "ptb" : "please text back",
    "pto" : "please turn over",
    "qpsa" : "what happens", #"que pasa",
    "ratchet" : "rude",
    "rbtl" : "read between the lines",
    "rlrt" : "real life retweet", 
    "rofl" : "rolling on the floor laughing",
    "roflol" : "rolling on the floor laughing out loud",
    "rotflmao" : "rolling on the floor laughing my ass off",
    "rt" : "retweet",
    "ruok" : "are you ok",
    "sfw" : "safe for work",
    "sk8" : "skate",
    "smh" : "shake my head",
    "sq" : "square",
    "srsly" : "seriously", 
    "ssdd" : "same stuff different day",
    "tbh" : "to be honest",
    "tbs" : "tablespooful",
    "tbsp" : "tablespooful",
    "tfw" : "that feeling when",
    "thks" : "thank you",
    "tho" : "though",
    "thx" : "thank you",
    "tia" : "thanks in advance",
    "til" : "today i learned",
    "tl;dr" : "too long i did not read",
    "tldr" : "too long i did not read",
    "tmb" : "tweet me back",
    "tntl" : "trying not to laugh",
    "ttyl" : "talk to you later",
    "u" : "you",
    "u2" : "you too",
    "u4e" : "yours for ever",
    "utc" : "coordinated universal time",
    "w/" : "with",
    "w/o" : "without",
    "w00t" : "joy",
    "w8" : "wait",
    "wassup" : "what is up",
    "wb" : "welcome back",
    "wtf" : "what the fuck",
    "wtg" : "way to go",
    "wtpa" : "where the party at",
    "wuf" : "where are you from",
    "wuzup" : "what is up",
    "wywh" : "wish you were here",
    "yd" : "yard",
    "ygtr" : "you got that right",
    "ynk" : "you never know",
    "zzz" : "sleeping bored and tired"
}

Since this course is on big data, we want to compare the performance of various binary classification in Spark's machine learning (ML) library in this project.

In [4]:
spark = SparkSession.builder.config('spark.driver.memory', '4g').appName('TwitterSentimentAnalysis').getOrCreate()

In [5]:
# import data
metadf = spark.read.csv("training.1600000.processed.noemoticon.csv", inferSchema=True)

In [None]:
# examine data
metadf.show()

In [6]:
# data preprocess
metadf = (((((metadf.withColumnRenamed("_c0", "Polarity")
              .withColumnRenamed("_c1", "TweetID"))
             .withColumnRenamed("_c2", "Date"))
            .withColumnRenamed("_c3", "QueryFlag"))
           .withColumnRenamed("_c4", "User"))
          .withColumnRenamed("_c5", "TweetText"))
missing_count = metadf.select([count(when(isnull(c), c)).alias(c) for c in metadf.columns]).collect()
print(missing_count)
duplicates = metadf.groupBy(metadf.columns).count().filter("count > 1")
duplicates.show()

[Row(Polarity=0, TweetID=0, Date=0, QueryFlag=0, User=0, TweetText=0)]
+--------+-------+----+---------+----+---------+-----+
|Polarity|TweetID|Date|QueryFlag|User|TweetText|count|
+--------+-------+----+---------+----+---------+-----+
+--------+-------+----+---------+----+---------+-----+



In [None]:
# check schema
metadf.printSchema() 

In [7]:
print(f"There are {metadf.count()} rows and  {len(metadf.columns)} columns in the dataset.")

There are 1600000 rows and  6 columns in the dataset.


In [None]:
# check for data types 
# metadf.dtypes

In [8]:
cols_to_drop= ("TweetID","Date","QueryFlag","User")
metadf = metadf.drop(*cols_to_drop)

In [9]:
metadf = metadf.dropDuplicates()
print(f"Number of rows in the dataframe after dropping the duplicates: {metadf.count()}")

Number of rows in the dataframe after dropping the duplicates: 1583688


In [10]:
metadf.printSchema()

root
 |-- Polarity: integer (nullable = true)
 |-- TweetText: string (nullable = true)



In [None]:
metadf.show(50, truncate = False)

In [11]:
# check the number of distinct labels and the respective counts 
metadf.groupBy('Polarity').count().orderBy('count').show()

+--------+------+
|Polarity| count|
+--------+------+
|       0|790183|
|       4|793505|
+--------+------+



In [12]:
# turn text polarity into double type

def polarity_map(value):
    if value == 4:
        return 1.0  # Positive
#     elif value == 2:
#         return 1.0  # Neutral
    else:
        return 0.0  # Negative


In [13]:
polarity_udf = udf(polarity_map, DoubleType())
metadf = metadf.withColumn("label", polarity_udf(metadf["Polarity"]))

In [14]:
# check the number of distinct labels and the respective counts 
metadf.groupBy('label').count().orderBy('count').show()

+-----+------+
|label| count|
+-----+------+
|  0.0|790183|
|  1.0|793505|
+-----+------+



Preprocessing Steps:

1. Convert to lower case
2. Replace actual URLs by a string `URL`
3. unescape unclean html characters
4. Replace emotions with emotion strings by using emo_unicode

 - @switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D
 
5. Replace abbreviations
6. replace any mention of users with the string `atUser`,
7. remove # before topics
8. replace ???, ..., !!! by `multiQuestion`, `multiStop`, `multiExclamation`,
9. replace contraction forms with full forms
10. Keep English characters, numbers(remove non-English words and punctuation)
11. POS-tagging for tokens, keep only adj, adv, verb, noun
12. stemming for tokens

In [24]:
# https://github.com/Deffro/text-preprocessing-techniques/blob/master/techniques.py
def removeUnicode(text):
    """ Removes unicode strings like "\u002c" and "x96" """
    text = re.sub(r'(\\u[0-9A-Fa-f]+)',r'', text)       
    text = re.sub(r'[^\x00-\x7f]',r'',text)
    return text

In [25]:
def replaceURL(text):
    """ Replaces url address with "url" """
    text = re.sub('((www\.[^\s]+)|(https?://[^\s]+))','URL',text)
    text = re.sub(r'#([^\s]+)', r'\1', text)
    return text
def replaceAtUser(text):
    """ Replaces "@user" with "atUser" """
    text = re.sub('@[^\s]+','atUser',text)
    return text
def removeHashtagInFrontOfWord(text):
    """ Removes hastag in front of a word """
    text = re.sub(r'#([^\s]+)', r'\1', text)
    return text

def replaceMultiExclamationMark(text):
    """ Replaces repetitions of exlamation marks """
    text = re.sub(r"(\!)\1+", ' multiExclamation ', text)
    return text

def replaceMultiQuestionMark(text):
    """ Replaces repetitions of question marks """
    text = re.sub(r"(\?)\1+", ' multiQuestion ', text)
    return text

def replaceMultiStopMark(text):
    """ Replaces repetitions of stop marks """
    text = re.sub(r"(\.)\1+", ' multiStop ', text)
    return text

contraction_patterns = [ (r'won\'t', 'will not'), (r'can\'t', 'cannot'), (r'i\'m', 'i am'), (r'ain\'t', 'is not'), (r'(\w+)\'ll', '\g<1> will'), (r'(\w+)n\'t', '\g<1> not'),
                         (r'(\w+)\'ve', '\g<1> have'), (r'(\w+)\'s', '\g<1> is'), (r'(\w+)\'re', '\g<1> are'), (r'(\w+)\'d', '\g<1> would'), (r'&', 'and'), (r'dammit', 'damn it'), (r'dont', 'do not'), (r'wont', 'will not') ]
def replaceContraction(text):
    patterns = [(re.compile(regex), repl) for (regex, repl) in contraction_patterns]
    for (pattern, repl) in patterns:
        (text, count) = re.subn(pattern, repl, text)
    return text

In [26]:
# tokenizer = RegexpTokenizer(r'[A-Za-z0-9_@#\']+')
tokenizer = RegexpTokenizer(r'[A-Za-z0-9_\']+')

# tokenizer = RegexpTokenizer(r'@\S+|https?:\S+|http?:\S|[^A-Za-z0-9]+')
def tokenize(s):
    full_tokens = []
    final_tokens = []    
    tokens = tokenizer.tokenize(s)
    tokens = [abbreviations.get(word, word) for word in tokens]
    for t in tokens:
        full_tokens=full_tokens+t.split()
    tagged = nltk.pos_tag(full_tokens)
    for w in tagged:
        if (w[1][0] in allowedWordTypes and w[0] not in stopwords_list):
            final_word = w[0]
#             final_word = lemmatizer.lemmatize(final_word)
            final_word = stemmer.stem(final_word)       
            final_tokens.append(final_word)
    final_text = " ".join(final_tokens)
    return final_tokens

In [18]:
# maintaing the complete url is unlikely to be useful. Hence we replace actual urls by the string 'url'
# replace_url_re = "https?:\S+|http?:\S"

In [27]:
def preprocess(text):
    # convert all to lower case
    # replace urls
    # replace special characters
#     text_cleaned = re.sub(replace_url_re, 'URL', str(text).lower()).strip()
    text_cleaned = replaceURL(str(text).lower()).strip()
    text_cleaned = html.unescape(text_cleaned)
    text_cleaned = removeUnicode(text_cleaned)

    for emoji,feeling  in EMOTICONS_EMO.items():
        text_cleaned = text_cleaned.replace(emoji,feeling)
    text_cleaned=" ".join(abbreviations.get(word, word) for word in text_cleaned.split())
    text_cleaned = replaceAtUser(text_cleaned)
    text_cleaned = removeHashtagInFrontOfWord(text_cleaned)
    text_cleaned = replaceMultiExclamationMark(text_cleaned)
    text_cleaned = replaceMultiQuestionMark(text_cleaned)
    text_cleaned = replaceMultiStopMark(text_cleaned)
#     
    text_cleaned = replaceContraction(text_cleaned)
    
    text_tokenized = tokenize(text_cleaned)
#     text_tokenized = [abbreviations.get(word, word) for word in text_tokenized]
    return text_tokenized

In [29]:
preprocess_udf = udf(lambda x: preprocess(x), ArrayType(StringType()))
metadf = metadf.withColumn('tokens_cleaned',preprocess_udf(col("TweetText")))

In [None]:
# def replace_at_label(words):
#     index = 0
#     while index<len(words):
#         word = words[index]
#         index += 1
#         if '@' in word:
#             words_first_half = words[:index-1]
#             words_second_half = words[index:]
            
#             at_split_result = word.split('@')
#             for i in range(1,len(at_split_result)):
#                 at_split_result[i] = 'USER'
#             if len(at_split_result[0]) == 0:
#                 at_split_result.pop(0)
#             words = words_first_half+at_split_result+words_second_half
#     return words

In [None]:
# replace_at_label_udf = udf(lambda x: replace_at_label(x), StringType())
# metadf = metadf.withColumn('text_cleaned',preprocess_udf(col("TweetText")))

In [30]:
metadf.show(10, truncate = False)

+--------+----------------------------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------+
|Polarity|TweetText                                                                                                             |label|tokens_cleaned                                                                                      |
+--------+----------------------------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------+
|0       |I feel like a complete idiot. I'm the only one who doesn't get how this shit works  help me                           |0.0  |[feel, complet, idiot, one, get, shit, work, help]                                                  |
|0       |@KishoreK this is strange, illegal torrent

In [31]:
metadf.printSchema()

root
 |-- Polarity: integer (nullable = true)
 |-- TweetText: string (nullable = true)
 |-- label: double (nullable = true)
 |-- tokens_cleaned: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [32]:
# word2vec process
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol="tokens_cleaned", outputCol="features")

In [None]:
# multi layer perceptron classifier
layers = [100, 64, 32, 2]
mlp = MultilayerPerceptronClassifier(layers=layers, blockSize=128, seed=1234)
pipeline = Pipeline(stages=[word2Vec, mlp])
seed = 24
(train, test) = metadf.randomSplit([0.8, 0.2], seed)
model = pipeline.fit(train)

In [None]:
predictions = model.transform(test)

In [None]:
predictions.printSchema()

In [None]:
predictions.show(5, truncate = False)

In [None]:
# predictions = model.transform(test)
predictions.select("TweetText", "label", "prediction").show()

In [None]:
predictionAndLabels = predictions.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

In [None]:
evaluator = MulticlassClassificationEvaluator(metricName="f1")
f1_score = evaluator.evaluate(predictions)
print(f"The f1_score of MLP model is: {f1_score}")

In [None]:
evaluator = MulticlassClassificationEvaluator(metricName="weightedPrecision")
weightedPrecision = evaluator.evaluate(predictionAndLabels)
print(f"The testing weightedPrecision of MLP model is: {weightedPrecision}")

In [None]:
evaluator = MulticlassClassificationEvaluator(metricName="weightedRecall")
weightedRecall = evaluator.evaluate(predictions)
print(f"The testing weightedRecall of MLP model is: {weightedRecall}")

In [None]:
evaluator = MulticlassClassificationEvaluator(metricName="weightedRecall")

