## Predictive Model Construction using Spark ML 

In [1]:
# To allocate more memory if necessary 
memory = '10g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [2]:
# Helper thread in order to have a stream running in the background in Jupyter

from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [None]:
# Data exploration
textFile = sc.textFile("./myoutput_historical_data_full")
textFile.count()

print("safe edits count: ", textFile.filter(lambda line: '"label": "safe"' in line).count())
print("unsafe edits count: ", textFile.filter(lambda line: '"label": "unsafe"' in line).count())
print("vandal edits count: ", textFile.filter(lambda line: '"label": "vandal"' in line).count())

In [3]:
# Read in saved dataset

df = spark.read.json("./myoutput_historical_data") #full training data is myoutput_historical_data_full
keep_list = ['label', 'text_new', 'text_old']
df = df.select([column for column in df.columns if column in keep_list])
df.dropna() # remove records with missing value in any column

# Split into training and testing data
df_train, df_test = df.randomSplit([0.8, 0.2],seed=15)
training_ratio = df_train.count()/df_test.count()
print(training_ratio) # check that training set has around 80% of records 
df_train.show(5)
df_test.show(5)


4.046287367405979
+-----+--------------------+--------------------+
|label|            text_new|            text_old|
+-----+--------------------+--------------------+
| safe|

{{Infobox music...|

{{Infobox music...|
| safe|

{{Infobox organ...|

{{Infobox organ...|
| safe|

{{refimprove|da...|

{{refimprove|da...|
| safe| A relative  pron...| A relative  pron...|
| safe|#REDIRECT [[Forei...|#REDIRECT [[Forei...|
+-----+--------------------+--------------------+
only showing top 5 rows

+-----+--------------------+--------------------+
|label|            text_new|            text_old|
+-----+--------------------+--------------------+
| safe|



'''John Russe...|



'''John Russe...|
| safe|

'''Roomy Pak'''...|

'''Roomy Pak'''...|
| safe|

{{Infobox river...|{{Coord|59|19|48|...|
| safe|#REDIRECT [[Econo...|#REDIRECT [[Econo...|
| safe|'''Dao xiao mian'...|'''Dao xiao mian'...|
+-----+--------------------+--------------------+
only showing top 5 rows



In [4]:
# Convert string label to integer label 
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType

indexer = StringIndexer(inputCol="label", outputCol="label_idx")
df_train = indexer.fit(df_train).transform(df_train) 
#df_train = df_train.withColumn("label_idx",df_train["label_idx"]).cast(IntegerType()))

df_test = indexer.fit(df_test).transform(df_test) 
#df_test = df_test.withColumn("label_idx",df_test["label_idx"]).cast(IntegerType()))

df_train.show(5)
df_test.show(5)

+-----+--------------------+--------------------+---------+
|label|            text_new|            text_old|label_idx|
+-----+--------------------+--------------------+---------+
| safe|

{{Infobox music...|

{{Infobox music...|      0.0|
| safe|

{{Infobox organ...|

{{Infobox organ...|      0.0|
| safe|

{{refimprove|da...|

{{refimprove|da...|      0.0|
| safe| A relative  pron...| A relative  pron...|      0.0|
| safe|#REDIRECT [[Forei...|#REDIRECT [[Forei...|      0.0|
+-----+--------------------+--------------------+---------+
only showing top 5 rows

+-----+--------------------+--------------------+---------+
|label|            text_new|            text_old|label_idx|
+-----+--------------------+--------------------+---------+
| safe|



'''John Russe...|



'''John Russe...|      0.0|
| safe|

'''Roomy Pak'''...|

'''Roomy Pak'''...|      0.0|
| safe|

{{Infobox river...|{{Coord|59|19|48|...|      0.0|
| safe|#REDIRECT [[Econo...|#REDIRECT [[Econo...|      0.0|
| safe|'''Dao x

In [5]:
# Tokenize text and find difference 
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover


# Words only 
tokenizer1 = RegexTokenizer(inputCol='text_new', outputCol='words_new', pattern='\\s+') 
tokenizer2 = RegexTokenizer(inputCol='text_old', outputCol='words_old', pattern='\\s+') #\\W

# Remove stopwords 
remover1 = StopWordsRemover(inputCol='words_new', outputCol='terms_new')
remover2 = StopWordsRemover(inputCol='words_old', outputCol='terms_old')

df_train = tokenizer1.transform(df_train)
df_train = tokenizer2.transform(df_train)
df_train = remover1.transform(df_train)
df_train = remover2.transform(df_train)

df_test = tokenizer1.transform(df_test)
df_test = tokenizer2.transform(df_test)
df_test = remover1.transform(df_test)
df_test = remover2.transform(df_test)

df_train.select('terms_new','terms_old').show(5)
df_test.select('terms_new','terms_old').show(5)

+--------------------+--------------------+
|           terms_new|           terms_old|
+--------------------+--------------------+
|[{{infobox, music...|[{{infobox, music...|
|[{{infobox, organ...|[{{infobox, organ...|
|[{{refimprove|dat...|[{{refimprove|dat...|
|[relative, pronou...|[relative, pronou...|
|[#redirect, [[for...|[#redirect, [[for...|
+--------------------+--------------------+
only showing top 5 rows

+--------------------+--------------------+
|           terms_new|           terms_old|
+--------------------+--------------------+
|['''john, russell...|['''john, russell...|
|['''roomy, pak'''...|['''roomy, pak'''...|
|[{{infobox, river...|[{{coord|59|19|48...|
|[#redirect, [[eco...|[#redirect, [[eco...|
|['''dao, xiao, mi...|['''dao, xiao, mi...|
+--------------------+--------------------+
only showing top 5 rows



In [6]:
# Look at data types
df_train.dtypes

[('label', 'string'),
 ('text_new', 'string'),
 ('text_old', 'string'),
 ('label_idx', 'double'),
 ('words_new', 'array<string>'),
 ('words_old', 'array<string>'),
 ('terms_new', 'array<string>'),
 ('terms_old', 'array<string>')]

In [7]:
# Find difference between terms new and old 
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import udf

differencer=udf(lambda x,y: list(set(x)-set(y)), ArrayType(StringType()))

df_train=df_train.withColumn('diff', differencer('terms_new', 'terms_old'))
df_test=df_test.withColumn('diff', differencer('terms_new', 'terms_old'))

# drop unnecessary columns here
keep_list = ['label', 'label_idx', 'diff']
df_train = df_train.select([column for column in df_train.columns if column in keep_list])
df_test = df_test.select([column for column in df_test.columns if column in keep_list])

df_train.select('label','diff').show(5)
df_test.select('label','diff').show(5)


+-----+--------------------+
|label|                diff|
+-----+--------------------+
| safe|      [|first=david]|
| safe|          ['k-word']|
| safe|[magazine—the, we...|
| safe|                  []|
| safe|    [subpage}}, {{r]|
+-----+--------------------+
only showing top 5 rows

+-----+--------------------+
|label|                diff|
+-----+--------------------+
| safe|[estate,, estate,...|
| safe|  [university|huron]|
| safe|[eng.svg, mouth_c...|
| safe|    [subpage}}, {{r]|
| safe|[2020}}, stub|dat...|
+-----+--------------------+
only showing top 5 rows



In [8]:
# Construct Pipeline and pass in diff (difference between new and old text)  
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Convert diff to Hash to TF-IDF
hasher = HashingTF(inputCol='diff', outputCol='hash')
idf = IDF(inputCol=hasher.getOutputCol(), outputCol='features')

# We try both Logistic regression and/or Naive Bayes
ml_model= LogisticRegression(featuresCol = 'features', labelCol = 'label_idx')
# ml_model= NaiveBayes(featuresCol = 'features', labelCol = 'label_idx')

# Create pipeline
pipeline = Pipeline(stages=[hasher, idf, ml_model])

# Create a multiclass classification evaluator
evaluator = MulticlassClassificationEvaluator(labelCol = 'label_idx')

# Create and build param grid 
# logistic regression params
params = ParamGridBuilder() \
    .addGrid(hasher.numFeatures, [1024]) \
    .addGrid(ml_model.regParam, [0.3]) \
    .addGrid(ml_model.maxIter,[10]) \
    .build()
# naive bayes params 
# params = ParamGridBuilder() \
#     .addGrid(hasher.numFeatures, [50]) \
#     .addGrid(ml_model.smoothing, [1.0]) \
#     .build()


# Create cross-validator 
cv = CrossValidator(estimator=pipeline, 
                    estimatorParamMaps=params, 
                    evaluator=evaluator, numFolds=3)

# Train model on multiple folds of the training data
cv = cv.fit(df_train)

# Get best model 
best_model = cv.bestModel

# Look at the stages in the best model
print(best_model.stages)

# Get the parameters for the logistic regression object in the best model
print(best_model.extractParamMap())

# Generate predictions on testing data using the best model then calculate RMSE
prediction = best_model.transform(df_test)
evaluator.evaluate(prediction)

# Create a confusion matrix, comparing predictions to known labels
prediction.groupBy('label','label_idx','prediction').count().show()

# Average AUC for each parameter combination in grid
avg_auc = cv.avgMetrics
#print("Average AUC for each parameter combination in grid: " + str(avg_auc))

# Average AUC for the best model
best_model_auc = max(cv.avgMetrics)
print("Average AUC for the best model: " + str(best_model_auc))

# AUC for best model on testing data
best_auc = evaluator.evaluate(cv.transform(df_test))
print("AUC for best model on testing data: " + str(best_auc))

# Print out parameters for best model 
print(best_model.stages[0].extractParamMap()) # parameters for haser
print(best_model.stages[1].extractParamMap()) # parameters for idf
print(best_model.stages[2].extractParamMap()) # parameters for logistic


[HashingTF_171261aecd03, IDF_9ca52d516593, LogisticRegressionModel: uid = LogisticRegression_37b7305902a7, numClasses = 3, numFeatures = 1024]
{}
+------+---------+----------+-----+
| label|label_idx|prediction|count|
+------+---------+----------+-----+
|vandal|      2.0|       0.0|   54|
|unsafe|      1.0|       1.0|   17|
|vandal|      2.0|       1.0|    2|
|unsafe|      1.0|       0.0|  749|
|  safe|      0.0|       0.0| 4340|
|  safe|      0.0|       1.0|   23|
+------+---------+----------+-----+

Average AUC for the best model: 0.7751907484283969
AUC for best model on testing data: 0.7745652350934468
{Param(parent='HashingTF_171261aecd03', name='binary', doc='If True, all non zero counts are set to 1. This is useful for discrete probabilistic models that model binary events rather than integer counts. Default False.'): False, Param(parent='HashingTF_171261aecd03', name='numFeatures', doc='number of features.'): 1024, Param(parent='HashingTF_171261aecd03', name='outputCol', doc='ou

In [11]:
# Save best model 
modelPath = "./models/lrmodel"
best_model.save(modelPath)