# **III. Supervised Machine Learning Predictive Modeling**

Models include:

1. Naive Bayes

2. Support Vector Machine

3. Logistic Regression

4. Random Forest



**1. Naive Bayes**

In [None]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial", weightCol="weight")

In [None]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

In [None]:
stages = []
# 1. clean data and tokenize sentences using RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+")
stages += [regexTokenizer]

# 2. CountVectorize the data
cv = CountVectorizer(inputCol="tokens", outputCol="token_features", minDF=2.0)#, vocabSize=3, minDF=2.0
stages += [cv]

# 3. Convert the labels to numerical values using binariser
indexer = StringIndexer(inputCol="label", outputCol="label1")
stages += [indexer]

# 4. Vectorise features using vectorassembler
vecAssembler = VectorAssembler(inputCols=['token_features'], outputCol="features")
stages += [vecAssembler]

[print('\n', stage) for stage in stages]

In [None]:
stages = []
# 1. clean data and tokenize sentences using RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+")
stages += [regexTokenizer]

# 2. CountVectorize the data
cv = CountVectorizer(inputCol="tokens", outputCol="token_features", minDF=2.0)#, vocabSize=3, minDF=2.0
stages += [cv]

# 3. Convert the labels to numerical values using binariser
indexer = StringIndexer(inputCol="label", outputCol="label1")
stages += [indexer]

# 4. Vectorise features using vectorassembler
vecAssembler = VectorAssembler(inputCols=['token_features'], outputCol="features")
stages += [vecAssembler]

[print('\n', stage) for stage in stages]


 RegexTokenizer_5edf07f9deeb

 CountVectorizer_eaacfaefa196

 StringIndexer_ce825f6570e2

 VectorAssembler_13f76b4c4a0c


[None, None, None, None]

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
data = pipeline.fit(tfidf_df).transform(tfidf_df)

In [None]:
train_nb, test_nb = data.randomSplit([0.8, 0.2], seed = 2018)

In [None]:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model_nb = nb.fit(train_nb)

In [None]:
predictions = model_nb.transform(test_nb)
# Select results to view
predictions.limit(10).select("label", "prediction", "probability").show(truncate=False)

+-----+----------+-------------------------------------------+
|label|prediction|probability                                |
+-----+----------+-------------------------------------------+
|1    |1.0       |[4.5902986561809365E-11,0.9999999999540969]|
|1    |1.0       |[4.3455584210085304E-7,0.9999995654441579] |
|1    |1.0       |[5.9377322148630876E-8,0.9999999406226778] |
|0    |0.0       |[0.9969209935083592,0.0030790064916408834] |
|1    |1.0       |[0.002129835933854133,0.9978701640661459]  |
|1    |1.0       |[0.0030842835789838745,0.9969157164210161] |
|1    |1.0       |[2.638334185789453E-17,1.0]                |
|1    |1.0       |[7.22552172467018E-8,0.9999999277447827]   |
|1    |1.0       |[1.626316976449768E-13,0.9999999999998375] |
|0    |0.0       |[0.9999999999999987,1.379233476913504E-15] |
+-----+----------+-------------------------------------------+



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print ("Test Area Under ROC: ", accuracy)

Test Area Under ROC:  0.8496871988138883


**2. Support Vector Machine**

In [None]:
# SVM model
numIterations = 50
regParam = 0.3
svm = SVMWithSGD.train(train_lb, numIterations, regParam=regParam)

# predict
test_lb = test.rdd.map(lambda row: LabeledPoint(row[1], MLLibVectors.fromML(row[0])))
scoreAndLabels_test = test_lb.map(lambda x: (float(svm.predict(x.features)), x.label))
score_label_test = spark.createDataFrame(scoreAndLabels_test, ["prediction", "label"])

In [None]:
#Model Evaluation
f1_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
svm_f1 = f1_eval.evaluate(score_label_test)
print("F1 score: %.4f" % svm_f1)

In [None]:
vocabulary = cvModel.vocabulary
weights = svm.weights.toArray()
svm_coeffs_df = pd.DataFrame({'word': vocabulary, 'weight': weights})

svm_coeffs_df.sort_values('weight').head(20)

**3. Logistic Regression**

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression( labelCol='label', maxIter=10)
lrModel = lr.fit(train_nb)

In [None]:
lrModel = lr.fit(train)
lr_pred = lrModel.transform(test)
f1_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
lr_f1 = f1_eval.evaluate(lr_pred)
print("F1 score: %.4f" % lr_f1)

**4. Random Forest**

In [None]:
from pyspark.ml.classification import (DecisionTreeClassifier, RandomForestClassifier,
                                      GBTClassifier)
from pyspark.ml import Pipeline


In [None]:
# Creating a vector assembly
assembler = VectorAssembler(inputCols=['tfidf'], outputCol='features')
final_data = assembler.transform(tfidf_df)

In [None]:
final_data.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- words_new: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- tfidf: vector (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = 'label', outputCol = 'label1')
outputFixed = indexer.fit(final_data).transform(final_data)
outputFixed.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- words_new: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- tfidf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label1: double (nullable = false)



In [None]:
outputFixed.printSchema()

final_df = outputFixed.select('features', 'label1')
final_df.show(3)

root
 |-- review_id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- words_new: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- tfidf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label1: double (nullable = false)

+--------------------+------+
|            features|label1|
+--------------------+------+
|(202063,[0,1,2,6,...|   0.0|
|(202063,[0,7,14,1...|   0.0|
|(202063,[0,2,3,13...|   0.0|
+--------------------+------+
only showing top 3 rows



In [None]:
# Scaling the data
scaler = StandardScaler(inputCol='features',outputCol='scaledFeat')
final_data = scaler.fit(final_data).transform(final_data)

In [None]:
train_rf, test_rf = final_df.randomSplit([0.8, 0.2])

In [None]:
# Building the random forest model
gb = GBTClassifier(labelCol = 'label1', featuresCol = 'scaledFeat')
gb_model = gb.fit(train)

In [None]:
rf_predictions = rf_model.transform(test)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

binary_evaluator = BinaryClassificationEvaluator(labelCol = 'label1')