In [15]:
!pip install numpy
!pip install matplotlib
!pip install scikit-learn
!pip install pyspark
!pip install eurekatrees

Collecting eurekatrees
  Downloading https://files.pythonhosted.org/packages/d9/b8/8d47720da790307fd8af264f0f36356941dac5f0263e10f1a374f6546d70/eurekatrees-0.3-py3-none-any.whl
Installing collected packages: eurekatrees
Successfully installed eurekatrees-0.3


In [2]:
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import desc
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf,col
import os
# tools
import re
import math
import json
import requests
import itertools
import numpy as np
import pandas as pd
import time
from datetime import datetime, timedelta
import string
import random

In [3]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#from pyspark.sql.functions import split as splitsp
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# %matplotlib inline
# import numpy as np
# import matplotlib.pyplot as plt
# from mpl_toolkits.mplot3d import Axes3D

# from pyspark.mllib.linalg import Vectors
# from pyspark.mllib.linalg.distributed import RowMatrix

In [4]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark Naive Bayes CountVectorizer") \
        .getOrCreate()
    return spark
spark = init_spark()

In [8]:
'''
Read Lemma data
'''
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

data = spark.read.csv("lemma-100days-wsbdata.csv", header=True)
function_array = udf(lambda r: r.split("|"), ArrayType(StringType()))
function_toNumerical = udf(lambda r: int(r), IntegerType())
text_lemmas = data.withColumn('finished_lemmas', function_array('text')).drop('text').withColumn('label', function_toNumerical('label'))
print(text_lemmas.count())
text_lemmas.show()

1144
+------+-----+--------------------+
|    id|label|     finished_lemmas|
+------+-----+--------------------+
|ks1tzw|    0|[all, right, all,...|
|ksjhuu|    0|[bldr, $, pt, pos...|
|kt79b2|    0|[we, might, be, h...|
|kt9enf|    0|[bear, feast, on,...|
|ktbu75|    0|[so, today, i, go...|
|ktfori|    0|[what, a, week, i...|
|kup2e2|    1|[start, to, feel,...|
|kurzyz|    0|[i, honestly, don...|
|kusng4|    0|[soi, recently, o...|
|kuku2g|    1|[nio, be, disrupt...|
|kutslm|    1|[listen, up, you,...|
|kv7k8k|    0|[have, anyone, no...|
|kvarzs|    0|[now, that, the, ...|
|kvcard|    0|[we, all, know, t...|
|kvcmhk|    1|[it, do, not, dil...|
|kva2kt|    0|[we, know, you, l...|
|kvacad|    1|[so, i, hear, you...|
|kvcimd|    1|[look, at, hour, ...|
|kvtk4b|    1|[giga, berain, co...|
|kw2vwm|    1|[see, the, end, o...|
+------+-----+--------------------+
only showing top 20 rows



In [9]:
'''
Get the Corpus.
Removing stop words from the text lemmas. 
'''
remover = StopWordsRemover(inputCol="finished_lemmas", outputCol="text")
filtered_df = remover.transform(text_lemmas)
filtered_df.show()

+------+-----+--------------------+--------------------+
|    id|label|     finished_lemmas|                text|
+------+-----+--------------------+--------------------+
|ks1tzw|    0|[all, right, all,...|[right, artist, f...|
|ksjhuu|    0|[bldr, $, pt, pos...|[bldr, $, pt, pos...|
|kt79b2|    0|[we, might, be, h...|[might, hear, fir...|
|kt9enf|    0|[bear, feast, on,...|[bear, feast, low...|
|ktbu75|    0|[so, today, i, go...|[today, go, games...|
|ktfori|    0|[what, a, week, i...|[week, steel, big...|
|kup2e2|    1|[start, to, feel,...|[start, feel, rea...|
|kurzyz|    0|[i, honestly, don...|[honestly, dont, ...|
|kusng4|    0|[soi, recently, o...|[soi, recently, o...|
|kuku2g|    1|[nio, be, disrupt...|[nio, disrupt, te...|
|kutslm|    1|[listen, up, you,...|[listen, degenera...|
|kv7k8k|    0|[have, anyone, no...|[anyone, notice, ...|
|kvarzs|    0|[now, that, the, ...|[boy, rc, get, po...|
|kvcard|    0|[we, all, know, t...|[know, 🌈🐻s, cal...|
|kvcmhk|    1|[it, do, not, dil..

In [10]:
'''
Create Document-Term Matrix by vectorizing the filtered text.
- returns the features column: (total nb of words, indices of each word in total vocab, count of each word)
'''
to_vectorize = filtered_df.select('id', 'label', 'text')
cv = CountVectorizer(inputCol="text", outputCol="features")
model_vec = cv.fit(to_vectorize)
result_vec = model_vec.transform(to_vectorize)
print("Total count of vocabulary:", len(model_vec.vocabulary))
result_vec.show()

Total count of vocabulary: 10932
+------+-----+--------------------+--------------------+
|    id|label|                text|            features|
+------+-----+--------------------+--------------------+
|ks1tzw|    0|[right, artist, f...|(10932,[1,2,4,7,8...|
|ksjhuu|    0|[bldr, $, pt, pos...|(10932,[3,5,6,7,9...|
|kt79b2|    0|[might, hear, fir...|(10932,[25,55,77,...|
|kt9enf|    0|[bear, feast, low...|(10932,[1,4,6,7,8...|
|ktbu75|    0|[today, go, games...|(10932,[1,4,8,25,...|
|ktfori|    0|[week, steel, big...|(10932,[3,4,5,7,8...|
|kup2e2|    1|[start, feel, rea...|(10932,[3,8,13,30...|
|kurzyz|    0|[honestly, dont, ...|(10932,[8,15,16,1...|
|kusng4|    0|[soi, recently, o...|(10932,[0,1,4,11,...|
|kuku2g|    1|[nio, disrupt, te...|(10932,[4,11,14,1...|
|kutslm|    1|[listen, degenera...|(10932,[4,5,11,12...|
|kv7k8k|    0|[anyone, notice, ...|(10932,[0,1,19,26...|
|kvarzs|    0|[boy, rc, get, po...|(10932,[1,5,6,8,1...|
|kvcard|    0|[know, 🌈🐻s, cal...|(10932,[0,1,3,4,5...|


In [11]:
selectedData = result_vec.select('id', 'label','features', 'text')
selectedData.show(truncate=True)

+------+-----+--------------------+--------------------+
|    id|label|            features|                text|
+------+-----+--------------------+--------------------+
|ks1tzw|    0|(10932,[1,2,4,7,8...|[right, artist, f...|
|ksjhuu|    0|(10932,[3,5,6,7,9...|[bldr, $, pt, pos...|
|kt79b2|    0|(10932,[25,55,77,...|[might, hear, fir...|
|kt9enf|    0|(10932,[1,4,6,7,8...|[bear, feast, low...|
|ktbu75|    0|(10932,[1,4,8,25,...|[today, go, games...|
|ktfori|    0|(10932,[3,4,5,7,8...|[week, steel, big...|
|kup2e2|    1|(10932,[3,8,13,30...|[start, feel, rea...|
|kurzyz|    0|(10932,[8,15,16,1...|[honestly, dont, ...|
|kusng4|    0|(10932,[0,1,4,11,...|[soi, recently, o...|
|kuku2g|    1|(10932,[4,11,14,1...|[nio, disrupt, te...|
|kutslm|    1|(10932,[4,5,11,12...|[listen, degenera...|
|kv7k8k|    0|(10932,[0,1,19,26...|[anyone, notice, ...|
|kvarzs|    0|(10932,[1,5,6,8,1...|[boy, rc, get, po...|
|kvcard|    0|(10932,[0,1,3,4,5...|[know, 🌈🐻s, cal...|
|kvcmhk|    1|(10932,[0,17,25,4..

In [12]:
'''
Separate data into training/test
'''
# training_zero, test_zero = selectedData.where(selectedData.label == 0).randomSplit([0.7, 0.3])
# training_one, test_one = selectedData.where(selectedData.label == 1).randomSplit([0.7, 0.3])

# training = training_zero.union(training_one)
# test = test_zero.union(test_one)
# #training.show()
# # should be 70% of total in training, 30% in test
# print("Total data count:", selectedData.count())
# print("Total count of >6%", training.count())
# print("Total count of <6%", test.count())

'\nSeparate data into training/test used for Naive-Bayes\n'

In [None]:
'''
Random Forest
'''
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

data = selectedData

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

paramGrid = ParamGridBuilder()\
     .addGrid(rf.numTrees, [ 200, 400,600,800,1000]) \
     .addGrid(rf.impurity,['entropy','gini']) \
     .addGrid(rf.maxDepth,[2,3,4,5]) \
     .build()

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="f1")

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5) 
# Train model.  This also runs the indexers.
#model = pipeline.fit(trainingData)

model = crossval.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
# evaluator = MulticlassClassificationEvaluator(
#     labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

#rfModel = model.stages[2]
#print(rfModel)  # summary only

In [20]:
model.stages[2]

RandomForestClassificationModel: uid=RandomForestClassifier_5aa7e229fa71, numTrees=10, numClasses=2, numFeatures=10932

In [6]:
# import statistics

# extract_method = "CountVectorizer"
# iter_each = 10
# iter_total = 50
# m_types = ["complement", "multinomial"]
# means = []

# for model_type in m_types:
#   for k in range(iter_total):
#     accuracies = []
#     smoothing = random.uniform(0.01, 0.8)
#     for i in range(iter_each):
#       accuracies.append(NAIVEBAYES_CV(smoothing, model_type))
#     mean = statistics.mean(accuracies)
#     print("=> Mean:", mean, "- Smoothing:", smoothing, "- Model:", model_type)
#     means.append((mean, smoothing, model_type, extract_method))

In [None]:
# from pyspark.sql.types import FloatType
# acc_df = pd.DataFrame(means, columns=['mean', 'smoothing', 'model_type', 'extract_method'])
# acc_df.to_csv("means_countvec.csv")

In [None]:
# from google.colab import files
# files.download('means_countvec.csv') 

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>