### Start a Spark session

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext


spark = SparkSession.builder.master("local[2]") \
                    .appName('Assignment 3 WBJ') \
                    .getOrCreate()
sc = SparkContext.getOrCreate( )

In [2]:
spark

### Data reading and cleaning 

We only care about 6 specific tags and corresponding texts, and missing data should be removed

In [3]:
# read data
data  = spark.read.csv("/Users/mac/Desktop/KU\ Leuven/Advanced\ Analytics\ in\ a\ Big\ Data\ World/Assignment3/dfPD.csv", header=True, inferSchema=True )

# clean and split data
clean_data = data.filter((data.tweet_text != "None") & (data.tweet_id != "None")& (data.label != "None") & (data.label.isin(["#vaccine", "#stopasianhate", "#covid", "#china", "#inflation", "#biden"])))

There are many Emojis and blocked contents in "tweet_text" column. By using regular expression, we are able to extract texts and remove noise. Since the dataframe in pyspark is treated as tuple which can't be modified, we have to convert pyspark dataframe to pandas dataframe first.

In [4]:
import pandas as pd
import re
new_data = clean_data.toPandas()
for row in range(len(new_data["tweet_text"])):
    # new_data["tweet_text"][row] = " ".join(re.findall(r"[a-zA-Z,;.':\\]+", new_data["tweet_text"][row])) 
    new_data["tweet_text"][row] = " ".join(re.findall(r"[a-zA-Z']+", new_data["tweet_text"][row])).lower() 

clean_data = spark.createDataFrame(new_data) 

Check if the cleaning work is done 

In [5]:
clean_data.groupby("label").count().show()
clean_data.printSchema()
(trainingData, testData) = clean_data.randomSplit([0.7, 0.3], seed = 100)
# clean_data.select('label').limit(2).collect()
# (trainingData.count(), testData.count())

+--------------+-----+
|         label|count|
+--------------+-----+
|        #biden| 1319|
|      #vaccine| 2900|
|        #china| 2366|
|        #covid| 2492|
|#stopasianhate|  730|
|    #inflation|  259|
+--------------+-----+

root
 |-- label: string (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- tweet_text: string (nullable = true)



Import necessary packages and functions

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover, IDF, StringIndexer, RegexTokenizer
from pyspark.ml.feature import VectorAssembler, Word2Vec
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [7]:
clean_data = clean_data.select("label", "tweet_text")
clean_data = clean_data.withColumnRenamed('label', 'class')

In [97]:
clean_data.select('tweet_text').limit(10).collect()

[Row(tweet_text='Today I learned that the origin of the word vaccine is tied to the latin word cow which is vacca The reason for this is due to the first smallpox vaccine using the milder cowpox virus to innoculate people'),
 Row(tweet_text='Insights into webinar on Reigniting Manufacturing Growth through MSME Development Electrical and Electronic sector'),
 Row(tweet_text='tests for pray'),
 Row(tweet_text="COVIDLandia Wisdom Loathing and Lots of Netflix inside America's Potemkin Government Book"),
 Row(tweet_text='Pennsylvania pauses use of Johnson amp Johnson vaccine'),
 Row(tweet_text="Did you catch us on BBCOxford last night Don't worry you can catch it again here https t co sWfa fmzw and catch Tracy talking all about our fab work with having your vaccination"),
 Row(tweet_text='Saturday Sunday Two Days Lockdown in Gandhidham Adipur'),
 Row(tweet_text='What is a live attenuated Learn more about the s used to treat diseases including and https t co QvkbpyMpT'),
 Row(tweet_text="If 

Below is a simple pipeline without any feature selection work

In [133]:
ham_spam_to_numeric = StringIndexer(inputCol = 'class', outputCol = 'label')

tokenizer = Tokenizer(inputCol = 'tweet_text', outputCol = 'token_text')
stop_remove = StopWordsRemover(inputCol = 'token_text', outputCol = 'stop_token')
count_vec = CountVectorizer(inputCol = 'stop_token', outputCol = 'c_vec')
idf = IDF(inputCol = 'c_vec', outputCol = 'tf_idf')
clean_up = VectorAssembler(inputCols = ['tf_idf'], outputCol = 'features')

pipeline = Pipeline(stages=[ham_spam_to_numeric, tokenizer, stop_remove, count_vec, idf, clean_up])

A pipeline with ChiSqSelector and increase the accuracy by more than 10%. Parameter "numTopFeatures" stands for how many features you wanto select.

In [13]:
ham_spam_to_numeric = StringIndexer(inputCol = 'class', outputCol = 'label')

tokenizer = Tokenizer(inputCol = 'tweet_text', outputCol = 'token_text')
stop_remove = StopWordsRemover(inputCol = 'token_text', outputCol = 'stop_token')
count_vec = CountVectorizer(inputCol = 'stop_token', outputCol = 'c_vec')
idf = IDF(inputCol = 'c_vec', outputCol = 'tf_idf')
clean_up = VectorAssembler(inputCols = ['tf_idf'], outputCol = 'raw_features')
final_data = ChiSqSelector(numTopFeatures = 5500, featuresCol="raw_features",
                         outputCol="features", labelCol="label")

pipeline = Pipeline(stages=[ham_spam_to_numeric, tokenizer, stop_remove, count_vec, idf, clean_up, final_data, nb])


A pipeline with UnivariateFeatureSelector. Model will select the feactures with threshold above the parameter "setSelectionThreshold". If properly selected, it produces similar results as ChiSqSelector.

In [206]:
from pyspark.ml.feature import UnivariateFeatureSelector
ham_spam_to_numeric = StringIndexer(inputCol = 'class', outputCol = 'label')

tokenizer = Tokenizer(inputCol = 'tweet_text', outputCol = 'token_text')
stop_remove = StopWordsRemover(inputCol = 'token_text', outputCol = 'stop_token')
count_vec = CountVectorizer(inputCol = 'stop_token', outputCol = 'c_vec')
idf = IDF(inputCol = 'c_vec', outputCol = 'tf_idf')
clean_up = VectorAssembler(inputCols = ['tf_idf'], outputCol = 'raw_features')
final_data = UnivariateFeatureSelector(featuresCol="raw_features",
                         outputCol="features", labelCol="label",selectionMode="fdr").setFeatureType("categorical").setLabelType("categorical").setSelectionThreshold(0.5)

pipeline = Pipeline(stages=[ham_spam_to_numeric, tokenizer, stop_remove, count_vec, idf, clean_up, final_data])

Fit the data based on the pipeline you select and make sure you only run one pipeline above.
You may change the proportion of trainingData and testData.

In [14]:
cleaner = pipeline.fit(clean_data)
clean_df = cleaner.transform(clean_data)
clean_df.printSchema()
clean_df.show(5)
clean_df = clean_df.select('label', 'features')
# clean_df = clean_df.withColumnRenamed('class', 'features')
clean_df.show(5)
(trainingData, testData) = clean_df.randomSplit([0.7, 0.3], seed = 200)

root
 |-- class: string (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- label: double (nullable = false)
 |-- token_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stop_token: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- c_vec: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- raw_features: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

+--------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|          tweet_text|label|          token_text|          stop_token|               c_vec|              tf_idf|        raw_features|            features|       rawPrediction|        

In [11]:
clean_data.printSchema()
clean_df.printSchema()
trainingData.printSchema()
testData.printSchema()

root
 |-- class: string (nullable = true)
 |-- tweet_text: string (nullable = true)

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)



Train a NaiveBayes model and compare its results

In [15]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes()
tag_classifier = nb.fit(trainingData)
predictions = tag_classifier.transform(testData)
predictions.show(3)

evaluator = MulticlassClassificationEvaluator()
print("Test Accuracy: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})))

+-----+------------+--------------------+--------------------+----------+
|label|    features|       rawPrediction|         probability|prediction|
+-----+------------+--------------------+--------------------+----------+
|  0.0|(5500,[],[])|[-1.2454099941901...|[0.28782287822878...|       0.0|
|  0.0|(5500,[],[])|[-1.2454099941901...|[0.28782287822878...|       0.0|
|  0.0|(5500,[],[])|[-1.2454099941901...|[0.28782287822878...|       0.0|
+-----+------------+--------------------+--------------------+----------+
only showing top 3 rows

Test Accuracy: 0.6354923992068737


DON NOT run the pipeline above, and instead run the following codes right after the data cleaning cell and before the data fitting cell.

If you want to include cross validation, run the codes below. 
If you want reliable results, go for TrainValidationSplit, which fits the data multiple times.

In [16]:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

ham_spam_to_numeric = StringIndexer(inputCol = 'class', outputCol = 'label')

tokenizer = Tokenizer(inputCol = 'tweet_text', outputCol = 'token_text')
stop_remove = StopWordsRemover(inputCol = 'token_text', outputCol = 'stop_token')
count_vec = CountVectorizer(inputCol = 'stop_token', outputCol = 'c_vec')
idf = IDF(inputCol = 'c_vec', outputCol = 'tf_idf')
clean_up = VectorAssembler(inputCols = ['tf_idf'], outputCol = 'raw_features')
final_data = ChiSqSelector(featuresCol="raw_features",
                          outputCol="features", labelCol="label")
nb = NaiveBayes()

pipeline = Pipeline(stages=[ham_spam_to_numeric, tokenizer, stop_remove, count_vec, idf, clean_up, final_data, nb])


(trainingData, testData) = clean_data.randomSplit([0.7, 0.3], seed = 200)
paramGrid = ParamGridBuilder()\
    .addGrid(final_data.numTopFeatures, list(range(4000,6000,100))) \
    .build()

tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(),
                           trainRatio=0.7)

tag_classifier = tvs.fit(trainingData)
predictions = tag_classifier.transform(testData)
evaluator = MulticlassClassificationEvaluator()
print("Test Accuracy: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})))

Test Accuracy: 0.5588235294117647


Train the random forest model.

If you run the cross validation cell above, remember to rerun the pipeline and fit the data, otherwise you will use the raw data WITHOUT ANY pre-processing work.

In [138]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)
rf_mode = rf.fit(trainingData)

predictions_rf = rf_mode.transform(testData)
evaluator_rf = MulticlassClassificationEvaluator()
print("Test Accuracy: " + str(evaluator_rf.evaluate(predictions_rf, {evaluator_rf.metricName: "accuracy"})))

Train the one-vs-rest model

If you run the cross validation cell above, remember to rerun the pipeline and fit the data, otherwise you will use the raw data WITHOUT ANY pre-processing work.

In [140]:
from pyspark.ml.classification import LogisticRegression, OneVsRest
lr = LogisticRegression(maxIter=20, tol=1E-6, fitIntercept=True)
ovr = OneVsRest(classifier=lr)
tag_classifier_ovr = ovr.fit(trainingData)

predictions_ovr = tag_classifier_ovr.transform(testData)
evaluator_ovr = MulticlassClassificationEvaluator()
print("Test Accuracy: " + str(evaluator_ovr.evaluate(predictions_ovr, {evaluator_ovr.metricName: "accuracy"})))

Train the multi-layer perceptron model, and make sure the input dimension is equal to the number of features. You can do this manually or just put the feature output size inside. Be carefully about the overstack problem.

If you run the cross validation cell above, remember to rerun the pipeline and fit the data, otherwise you will use the raw data WITHOUT ANY pre-processing work.

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
# specify layers for the neural network:
# input layer of size 5500 (features), two intermediate of size 100 and 50
# and output of size 6 (classes)

layers = [5500, 100, 50, 6]
trainer = MultilayerPerceptronClassifier(maxIter=200, layers=layers, blockSize=64, seed=1234)
mlp = trainer.fit(trainingData)

predictions_mlp = mlp.transform(testData)
evaluator_mlp = MulticlassClassificationEvaluator()
print("Test Accuracy: " + str(evaluator_mlp.evaluate(predictions_mlp, {evaluator_mlp.metricName: "accuracy"})))

A pipeline using HashingTF.

Can have a try.

In [142]:
from pyspark.ml.feature import HashingTF

ham_spam_to_numeric = StringIndexer(inputCol = 'class', outputCol = 'label')

tokenizer = Tokenizer(inputCol = 'tweet_text', outputCol = 'token_text')
stop_remove = StopWordsRemover(inputCol = 'token_text', outputCol = 'stop_token')
hashingTF = HashingTF(inputCol="stop_token", outputCol="rawFeatures", numFeatures= 8000)
idf = IDF(inputCol = 'rawFeatures', outputCol = 'tf_idf')
clean_up = VectorAssembler(inputCols = ['tf_idf'], outputCol = 'features')

pipeline = Pipeline(stages=[ham_spam_to_numeric, tokenizer, stop_remove, hashingTF, idf, clean_up])