In [None]:
!pip install numpy
!pip install matplotlib
!pip install scikit-learn
!pip install pyspark



In [None]:
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import desc
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf,col
import os
# tools
import re
import math
import json
import requests
import itertools
import numpy as np
import pandas as pd
import time
from datetime import datetime, timedelta
import string
import random

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#from pyspark.sql.functions import split as splitsp
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# %matplotlib inline
# import numpy as np
# import matplotlib.pyplot as plt
# from mpl_toolkits.mplot3d import Axes3D

# from pyspark.mllib.linalg import Vectors
# from pyspark.mllib.linalg.distributed import RowMatrix

In [None]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark Naive Bayes CountVectorizer") \
        .getOrCreate()
    return spark
spark = init_spark()

In [None]:
'''
Read Lemma data
'''
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

data = spark.read.csv("lemma-100days-wsbdata.csv", header=True)
function_array = udf(lambda r: r.split("|"), ArrayType(StringType()))
function_toNumerical = udf(lambda r: int(r), IntegerType())
text_lemmas = data.withColumn('finished_lemmas', function_array('text')).drop('text').withColumn('label', function_toNumerical('label'))
print(text_lemmas.count())
text_lemmas.show()

1144
+------+-----+--------------------+
|    id|label|     finished_lemmas|
+------+-----+--------------------+
|ks1tzw|    0|[all, right, all,...|
|ksjhuu|    0|[bldr, $, pt, pos...|
|kt79b2|    0|[we, might, be, h...|
|kt9enf|    0|[bear, feast, on,...|
|ktbu75|    0|[so, today, i, go...|
|ktfori|    0|[what, a, week, i...|
|kup2e2|    1|[start, to, feel,...|
|kurzyz|    0|[i, honestly, don...|
|kusng4|    0|[soi, recently, o...|
|kuku2g|    1|[nio, be, disrupt...|
|kutslm|    1|[listen, up, you,...|
|kv7k8k|    0|[have, anyone, no...|
|kvarzs|    0|[now, that, the, ...|
|kvcard|    0|[we, all, know, t...|
|kvcmhk|    1|[it, do, not, dil...|
|kva2kt|    0|[we, know, you, l...|
|kvacad|    1|[so, i, hear, you...|
|kvcimd|    1|[look, at, hour, ...|
|kvtk4b|    1|[giga, berain, co...|
|kw2vwm|    1|[see, the, end, o...|
+------+-----+--------------------+
only showing top 20 rows



In [None]:
'''
Get the Corpus.
Removing stop words from the text lemmas. 
'''
remover = StopWordsRemover(inputCol="finished_lemmas", outputCol="text")
filtered_df = remover.transform(text_lemmas)
filtered_df.show()

+------+-----+--------------------+--------------------+
|    id|label|     finished_lemmas|                text|
+------+-----+--------------------+--------------------+
|ks1tzw|    0|[all, right, all,...|[right, artist, f...|
|ksjhuu|    0|[bldr, $, pt, pos...|[bldr, $, pt, pos...|
|kt79b2|    0|[we, might, be, h...|[might, hear, fir...|
|kt9enf|    0|[bear, feast, on,...|[bear, feast, low...|
|ktbu75|    0|[so, today, i, go...|[today, go, games...|
|ktfori|    0|[what, a, week, i...|[week, steel, big...|
|kup2e2|    1|[start, to, feel,...|[start, feel, rea...|
|kurzyz|    0|[i, honestly, don...|[honestly, dont, ...|
|kusng4|    0|[soi, recently, o...|[soi, recently, o...|
|kuku2g|    1|[nio, be, disrupt...|[nio, disrupt, te...|
|kutslm|    1|[listen, up, you,...|[listen, degenera...|
|kv7k8k|    0|[have, anyone, no...|[anyone, notice, ...|
|kvarzs|    0|[now, that, the, ...|[boy, rc, get, po...|
|kvcard|    0|[we, all, know, t...|[know, 🌈🐻s, cal...|
|kvcmhk|    1|[it, do, not, dil..

In [None]:
'''
Create Document-Term Matrix by vectorizing the filtered text.
- returns the features column: (total nb of words, indices of each word in total vocab, count of each word)
'''
to_vectorize = filtered_df.select('id', 'label', 'text')
cv = CountVectorizer(inputCol="text", outputCol="features")
model_vec = cv.fit(to_vectorize)
result_vec = model_vec.transform(to_vectorize)
print("Total count of vocabulary:", len(model_vec.vocabulary))
result_vec.show()

Total count of vocabulary: 10932
+------+-----+--------------------+--------------------+
|    id|label|                text|            features|
+------+-----+--------------------+--------------------+
|ks1tzw|    0|[right, artist, f...|(10932,[1,2,4,7,8...|
|ksjhuu|    0|[bldr, $, pt, pos...|(10932,[3,5,6,7,9...|
|kt79b2|    0|[might, hear, fir...|(10932,[25,55,77,...|
|kt9enf|    0|[bear, feast, low...|(10932,[1,4,6,7,8...|
|ktbu75|    0|[today, go, games...|(10932,[1,4,8,25,...|
|ktfori|    0|[week, steel, big...|(10932,[3,4,5,7,8...|
|kup2e2|    1|[start, feel, rea...|(10932,[3,8,13,30...|
|kurzyz|    0|[honestly, dont, ...|(10932,[8,15,16,1...|
|kusng4|    0|[soi, recently, o...|(10932,[0,1,4,11,...|
|kuku2g|    1|[nio, disrupt, te...|(10932,[4,11,14,1...|
|kutslm|    1|[listen, degenera...|(10932,[4,5,11,12...|
|kv7k8k|    0|[anyone, notice, ...|(10932,[0,1,19,26...|
|kvarzs|    0|[boy, rc, get, po...|(10932,[1,5,6,8,1...|
|kvcard|    0|[know, 🌈🐻s, cal...|(10932,[0,1,3,4,5...|


In [None]:
selectedData = result_vec.select('id', 'label','features', 'text')
selectedData.show(truncate=True)
TP = udf(lambda x,y: int(x==1 and y==1))
FP = udf(lambda x,y: int(x==1 and y==0))
FN = udf(lambda x,y: int(x==0 and y==1))

+------+-----+--------------------+--------------------+
|    id|label|            features|                text|
+------+-----+--------------------+--------------------+
|ks1tzw|    0|(10932,[1,2,4,7,8...|[right, artist, f...|
|ksjhuu|    0|(10932,[3,5,6,7,9...|[bldr, $, pt, pos...|
|kt79b2|    0|(10932,[25,55,77,...|[might, hear, fir...|
|kt9enf|    0|(10932,[1,4,6,7,8...|[bear, feast, low...|
|ktbu75|    0|(10932,[1,4,8,25,...|[today, go, games...|
|ktfori|    0|(10932,[3,4,5,7,8...|[week, steel, big...|
|kup2e2|    1|(10932,[3,8,13,30...|[start, feel, rea...|
|kurzyz|    0|(10932,[8,15,16,1...|[honestly, dont, ...|
|kusng4|    0|(10932,[0,1,4,11,...|[soi, recently, o...|
|kuku2g|    1|(10932,[4,11,14,1...|[nio, disrupt, te...|
|kutslm|    1|(10932,[4,5,11,12...|[listen, degenera...|
|kv7k8k|    0|(10932,[0,1,19,26...|[anyone, notice, ...|
|kvarzs|    0|(10932,[1,5,6,8,1...|[boy, rc, get, po...|
|kvcard|    0|(10932,[0,1,3,4,5...|[know, 🌈🐻s, cal...|
|kvcmhk|    1|(10932,[0,17,25,4..

In [None]:
'''
Separate data into training/test used for Naive-Bayes
'''
# training_zero, test_zero = selectedData.where(selectedData.label == 0).randomSplit([0.7, 0.3])
# training_one, test_one = selectedData.where(selectedData.label == 1).randomSplit([0.7, 0.3])

# training = training_zero.union(training_one)
# test = test_zero.union(test_one)
# #training.show()
# # should be 70% of total in training, 30% in test
# print("Total data count:", selectedData.count())
# print("Total count of >6%", training.count())
# print("Total count of <6%", test.count())

'\nSeparate data into training/test used for Naive-Bayes\n'

In [None]:
'''
Naive-Bayes following from CountVectorizer with K-fold cross-validation
'''
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

def NAIVEBAYES_CVKFOLD(smooth=0, model_type=0): 
  # separating train/test data
  training_zero, test_zero = selectedData.where(selectedData.label == 0).randomSplit([0.7, 0.3])
  training_one, test_one = selectedData.where(selectedData.label == 1).randomSplit([0.7, 0.3])

  training = training_zero.union(training_one)
  test = test_zero.union(test_one)



  # create trainer with parameters then train
  # smoothing: smooth probabilities of 0 to the input
  #nb = NaiveBayes(smoothing=0.636768, modelType="complement")
  nb = NaiveBayes(smoothing=smooth, modelType=model_type)

  # Create ParamGrid for Cross Validation
  nbparamGrid = (ParamGridBuilder()
               .addGrid(nb.smoothing, [0.2684835187532758, 0.46961116193132335, 0.44860427414315174, 0.7649149741542184, 0.25609960046163693])
               .build())
  
  # evaluation
  #, metricName="accuracy

  #evaluator = BinaryClassificationMetrics(labelCol="label", predictionCol="prediction")
  #evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
  #evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
  # Create 5-fold CrossValidator
  evaluator = BinaryClassificationEvaluator();
  nbcv = CrossValidator(estimator = nb,
                      estimatorParamMaps = nbparamGrid,
                      evaluator = evaluator,
                      numFolds = 5)

  model_NBCV = nbcv.fit(training)

  # display on test set: appends a prediction column
  predictions = model_NBCV.transform(test)
  predictionAndLabels = predictions.rdd.map(lambda row: (float(row.prediction), row.label))

  accuracy = evaluator.evaluate(predictions)
  #metrics_multi = MulticlassMetrics(predictionAndLabels)

  labels = predictions.rdd.map(lambda lp: lp.label).distinct().collect()
  #precision = metrics_multi.precision()
  #recall = metrics_multi.recall()
  #f1Score = metrics_multi.fMeasure()
  print("Summary Stats")
  print('Model accuracy:', accuracy)
  #predictlabel_df = predictionAndLabels.toDF()
  #predictlabel_df
  
  prela_df = predictions.select("prediction","label")
  prela_df=prela_df.withColumn("TP", TP(prela_df.prediction,prela_df.label))
  prela_df=prela_df.withColumn("FP", FP(prela_df.prediction,prela_df.label))
  prela_df=prela_df.withColumn("FN", FN(prela_df.prediction,prela_df.label))
  TP_ = prela_df.select("TP").count()
  FP_ = prela_df.select("FP").count()
  FN_ = prela_df.select("FN").count()
  
  print(TP,FP,FN)
  precision = TP/(TP+FP)
  recall = TP/(TP+FN)

  F1 = 2*(precision*recall)/(precision+recall)
  print("Precision:",str(precision))
  print("Recall:",str(recall))
  print("F1 Score:",str(F1))
  return model_NBCV

In [None]:
final_model_cv = NAIVEBAYES_CVKFOLD(model_type='multinomial')

Summary Stats
Model accuracy: 0.4610613810741688
354 354 354
Precision: 0.5
Recall: 0.5
F1 Score: 0.5


In [None]:
'''
Naive-Bayes following from CountVectorizer
'''
def NAIVEBAYES_CV(smooth=0, model_type=0): 
  # separating train/test data
  training_zero, test_zero = selectedData.where(selectedData.label == 0).randomSplit([0.7, 0.3])
  training_one, test_one = selectedData.where(selectedData.label == 1).randomSplit([0.7, 0.3])

  training = training_zero.union(training_one)
  test = test_zero.union(test_one)

  # create trainer with parameters then train
  # smoothing: smooth probabilities of 0 to the input
  #nb = NaiveBayes(smoothing=0.636768, modelType="complement")
  nb = NaiveBayes(smoothing=smooth, modelType=model_type)
  model_NB = nb.fit(training)

  # display on test set: appends a prediction column
  predictions = model_NB.transform(test)
  #predictions.show()
  prela_df = predictions.select("prediction","label")
  prela_df=prela_df.withColumn("TP", TP(prela_df.prediction,prela_df.label))
  prela_df=prela_df.withColumn("FP", FP(prela_df.prediction,prela_df.label))
  prela_df=prela_df.withColumn("FN", FN(prela_df.prediction,prela_df.label))

  #prela_df.select("TP","FP").map(lambda x: )

  TP_ = prela_df.where(prela_df.TP==1).count()
  FP_ = prela_df.where(prela_df.FP==1).count()
  FN_ = prela_df.where(prela_df.FN==1).count()

  print(TP_,FP_,FN_)

  precision = TP_/(TP_+FP_)
  recall = TP_/(TP_+FN_)
  F1 = 2*(precision*recall)/(precision+recall)

  # compute accuracy of on test set: compares labelCol and predictionCol
  evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
  accuracy = evaluator.evaluate(predictions)
  #print('Model accuracy:', accuracy)
  #print(accuracy,precision,recall,F1)
  return (accuracy,precision,recall,F1,model_NB)

In [None]:
acc,precision,recall,F1,modelNB = NAIVEBAYES_CV(0.2684835187532758,"multinomial")
print("Accuracy: ",acc)
print("Precision: ",precision)
print("Recall: ",recall)
print("F1 Score: ",F1)

96 55 92
Accuracy:  0.6211340206185567
Precision:  0.6357615894039735
Recall:  0.5106382978723404
F1 Score:  0.5663716814159292


In [None]:
'''
Iteration tests on Naive-Bayes
'''
import statistics

extract_method = "CountVectorizer"
iter_each = 10
iter_total = 50
m_types = ["complement", "multinomial"]
means = []

for model_type in m_types:
  for k in range(iter_total):
    accuracies = []
    smoothing = random.uniform(0.01, 0.8)
    for i in range(iter_each):
      accuracies.append(NAIVEBAYES_CV(smoothing, model_type))
    mean = statistics.mean(accuracies)
    print("=> Mean:", mean, "- Smoothing:", smoothing, "- Model:", model_type)
    means.append((mean, smoothing, model_type, extract_method))

In [None]:
# from pyspark.sql.types import FloatType
# acc_df = pd.DataFrame(means, columns=['mean', 'smoothing', 'model_type', 'extract_method'])
# acc_df.to_csv("means_countvec.csv")

In [None]:
# from google.colab import files
# files.download('means_countvec.csv') 