In [1]:
from pyspark.ml.feature import RegexTokenizer, HashingTF, IDF, CountVectorizer, Normalizer, StringIndexer, Tokenizer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, DoubleType, StringType, StructType, StructField
from pyspark.sql import SparkSession
from pyspark.ml.feature import StopWordsRemover
from nltk.sentiment import SentimentIntensityAnalyzer
import pyspark

Note that to run this notebook up you need to wget your own dataset using:

wget http://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip

and unzip the file into ./sentiment140 subfolder.

This dataset is too large to push to the github repository.

In [2]:
data_file = r"file:///home/jovyan/repos/distributed-sentiment-analysis-on-twitter-data/sentiment140/training.1600000.processed.noemoticon.csv"

In [3]:
conf = pyspark.SparkConf().setAll([('spark.executor.memory', '6g'),
                                   ('spark.executor.cores', '3'),
                                   ('spark.cores.max', '3'),
                                   ('spark.driver.memory','2g')])

# Initialize a Spark session
spark = SparkSession \
    .builder \
    .appName("TrainSentimentModel") \
    .config(conf=conf) \
    .getOrCreate()

sc = spark.sparkContext

In [4]:
sc.getConf().getAll()

[('spark.executor.memory', '2g'),
 ('spark.app.name', 'TrainSentimentModel'),
 ('spark.app.id', 'local-1523945709991'),
 ('spark.driver.port', '43687'),
 ('spark.executor.id', 'driver'),
 ('spark.cores.max', '3'),
 ('spark.driver.host', 'de4f1c03e850'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.memory', '2g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.cores', '3'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

In [5]:
# define the data schema(format/structure) for our twitter data in the csv file
training_data_schema = StructType([StructField("target", StringType(), True),
                                  StructField("id", StringType(), True),
                                  StructField("date", StringType(), True),
                                  StructField("query", StringType(), True),
                                  StructField("user", StringType(), True),
                                  StructField("text_raw", StringType(), True)])

In [6]:
df_raw = spark.read.csv(
    data_file, schema=training_data_schema
)

In [7]:
df_raw.show(truncate=False)

+------+----------+----------------------------+--------+---------------+---------------------------------------------------------------------------------------------------------------------+
|target|id        |date                        |query   |user           |text_raw                                                                                                             |
+------+----------+----------------------------+--------+---------------+---------------------------------------------------------------------------------------------------------------------+
|0     |1467810369|Mon Apr 06 22:19:45 PDT 2009|NO_QUERY|_TheSpecialOne_|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D  |
|0     |1467810672|Mon Apr 06 22:19:49 PDT 2009|NO_QUERY|scotthamilton  |is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!      |
|0     |1467810917|Mon Apr 06 22:19:53 P

In [8]:
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')

%matplotlib inline
%config InlineBackend.figure_format = 'retina'
import re
from bs4 import BeautifulSoup
from nltk.tokenize import WordPunctTokenizer
tok = WordPunctTokenizer()

at_user_pat = r'@[A-Za-z0-9_]+'  # r'@[\w]+'
url_pat = r'https?://[^ ]+'  # r'https?:\/\/[^\s]+'
www_pat = r'www.[^ ]+'
repeating_chars_pat = r'([A-Za-z])\1+'
negations_dic = {"isn't":"is not", "aren't":"are not", "wasn't":"was not", "weren't":"were not",
                "haven't":"have not","hasn't":"has not","hadn't":"had not","won't":"will not",
                "wouldn't":"would not", "don't":"do not", "doesn't":"does not","didn't":"did not",
                "can't":"can not","couldn't":"could not","shouldn't":"should not","mightn't":"might not",
                "mustn't":"must not"}
neg_pattern = re.compile(r'\b(' + '|'.join(negations_dic.keys()) + r')\b')

def tweet_cleaner(text):
    soup = BeautifulSoup(text, 'lxml')
    souped = soup.get_text()
    try:
        bom_removed = souped.decode("utf-8-sig").replace(u"\ufffd", "?")
    except:
        bom_removed = souped
    stripped = re.sub(at_user_pat, 'USERNAME', bom_removed)
    stripped = re.sub(url_pat, 'URL', stripped)
    stripped = re.sub(www_pat, 'URL', stripped)
    stripped = re.sub(repeating_chars_pat, r'\1\1', stripped)
    
    lower_case = stripped.lower()
    neg_handled = neg_pattern.sub(lambda x: negations_dic[x.group()], lower_case)
    letters_only = re.sub("[^a-zA-Z]", " ", neg_handled)
    # During the letters_only process two lines above, it has created unnecessay white spaces,
    # I will tokenize and join together to remove unneccessary white spaces
    words = [x for x  in tok.tokenize(letters_only) if len(x) > 1]
    return (" ".join(words)).strip()

udf_tweet_cleaner = udf(tweet_cleaner)

In [9]:
text_preprocessed = df_raw.withColumn("text", udf_tweet_cleaner(col("text_raw")))

In [10]:
text_preprocessed.select("target", "text_raw").show(truncate=False)

+------+---------------------------------------------------------------------------------------------------------------------+
|target|text_raw                                                                                                             |
+------+---------------------------------------------------------------------------------------------------------------------+
|0     |@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D  |
|0     |is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!      |
|0     |@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds                            |
|0     |my whole body feels itchy and like its on fire                                                                       |
|0     |@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over

In [11]:
text_preprocessed.select("target", "text").show(truncate=False)

+------+---------------------------------------------------------------------------------------------------------+
|target|text                                                                                                     |
+------+---------------------------------------------------------------------------------------------------------+
|0     |username url aww that bummer you shoulda got david carr of third day to do it                            |
|0     |is upset that he can not update his facebook by texting it and might cry as result school today also blah|
|0     |username dived many times for the ball managed to save the rest go out of bounds                         |
|0     |my whole body feels itchy and like its on fire                                                           |
|0     |username no it not behaving at all mad why am here because can not see you all over there                |
|0     |username not the whole crew                                             

In [12]:
type(text_preprocessed)

pyspark.sql.dataframe.DataFrame

In [13]:
text_preprocessed = text_preprocessed.dropna()

In [14]:
text_preprocessed.count()  # due to lazy execution, this takes a while to run. A minute or two.

1600000

In [13]:
(train_set, val_set, test_set) = text_preprocessed.randomSplit([0.98, 0.01, 0.01], seed = 2018)

# HashingTF + IDF + Logistic Regression

In [14]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [15]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+------+----------+--------------------+--------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|target|        id|                date|   query|           user|            text_raw|                text|               words|                  tf|            features|label|
+------+----------+--------------------+--------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|username url aww ...|[username, url, a...|(65536,[8436,8847...|(65536,[8436,8847...|  0.0|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|is upset that he ...|[is, upset, that,...|(65536,[1444,2071...|(65536,[1444,2071...|  0.0|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|username dived ma...|[usernam

In [16]:
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
auroc = evaluator.evaluate(predictions)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())

print("AUROC: {}\nAccuracy: {}".format(auroc, accuracy))

AUROC: 0.8630617822217924
Accuracy: 0.7927058528637674


# CountVectorizer + IDF + Logistic Regression

In [None]:
%%time
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

# Using N-Gram

In [None]:
from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.feature import ChiSqSelector

def build_trigrams(inputCol=["text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+selector+lr)

In [None]:
def build_ngrams_wocs(inputCol=["text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)

In [None]:
%%time
trigramwocs_pipelineFit = build_ngrams_wocs().fit(train_set)
predictions_wocs = trigramwocs_pipelineFit.transform(val_set)
accuracy_wocs = predictions_wocs.filter(predictions_wocs.label == predictions_wocs.prediction).count() / float(val_set.count())
roc_auc_wocs = evaluator.evaluate(predictions_wocs)

print("Accuracy Score: {0:.4f}".format(accuracy_wocs))
print("ROC-AUC: {0:.4f}".format(roc_auc_wocs))

In [None]:
test_predictions = trigramwocs_pipelineFit.transform(test_set)
test_accuracy = test_predictions.filter(test_predictions.label == test_predictions.prediction).count() / float(test_set.count())
test_roc_auc = evaluator.evaluate(test_predictions)

print("Accuracy Score: {0:.4f}".format(test_accuracy))
print("ROC-AUC: {0:.4f}".format(test_roc_auc))