In [None]:
#importing all required libraries here. There are some stuff that I had used previously but have not removed here
from pyspark import *
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pandas as pd
from pathlib import Path
import glob
import os
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType
#importing pyspark machine learning libraries
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

In [None]:
#I have a 8 gb ram so I have 7.6 gb available to use so I am allowing max java heap to use 6 gb
from pyspark import SparkConf
conf=SparkConf()
conf.set("spark.driver.memory", "3g")
conf.set("spark.executor.memory", "3g")

In [None]:
#Don't run this. This is the alternative if driver keeps running out of memory
#Executor memory can be ignored while using spark local.
#from pyspark import SparkConf
#conf=SparkConf()
#conf.set("spark.driver.memory", "7g")

In [None]:
#defining spark context and spark session
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
sc

In [None]:
#defining the schema of the json texts
schema = StructType([
    StructField('title_page', StringType(), True),
    StructField('text_new', StringType(), True),
    StructField('text_old', StringType(), True),
    StructField('name_user', StringType(), True),
    StructField('label', StringType(), True),
    StructField('comment', StringType(), True)
])

In [None]:
#this would recursively read all json files into the same data frame. ** wildcard refers to all directories
#This is the easiest way to read json in spark
df_json = spark.read.json("/home/shourya/fold/**/part-*",schema, multiLine=True)

In [None]:
#this would print the schema of the pyspark sql dataframe
df_json.printSchema()

In [None]:
#this is how we can convert it to a pandas dataframe which is easier to work with or for checking stuff
pandas = df_json.select("*").toPandas()

In [None]:
#Counting unique tokens in labels
from pyspark.sql.functions import col
df_json.groupBy("label") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

In [None]:
#counting unique tokens of user names
df_json.groupBy("name_user") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

In [None]:
#Don't run this
#calculate the difference between two text columns using spark
#Output is going to be a list and it would skip the nulls so dimensions would differ from actual df dimension
#collect = df_json.select('text_new').subtract(df_json.select('text_old')).collect()

In [None]:
#Don't run this
#This would print the frequency of words in the difference. It can show which are the most frequently changed words.
#wordfreq = []
#for w in collect:
#    wordfreq.append(collect.count(w))

#print("Pairs\n" + str(list(zip(collect, wordfreq))))   

In [None]:
#Don't run this
# convert the difference to a pandas dataframe with the charctare "a" as column name.
#Typecasting the list to a dataframe for ease of use.
#import numpy as np
#len(collect)
#df_collect = pd.DataFrame(np.array(collect).reshape(225,1), columns = list("a"))
#df_collect.head()

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
# regular expression tokenizer
regexTokenizer_new = RegexTokenizer(inputCol="text_new", outputCol="words_new", pattern="\\W")
regexTokenizer_old = RegexTokenizer(inputCol="text_old", outputCol="words_old", pattern="\\W")
regexTokenizer_title = RegexTokenizer(inputCol="title_page", outputCol="words_title_page", pattern="\\W")
regexTokenizer_user = RegexTokenizer(inputCol="name_user", outputCol="words_name_user", pattern="\\W")
regexTokenizer_comment = RegexTokenizer(inputCol="comment", outputCol="words_comment", pattern="\\W")
# stop words
add_stopwords = ["a","an","the","on",]
stopwordsRemover_new = StopWordsRemover(inputCol="words_new", outputCol="filtered_new").setStopWords(add_stopwords)
stopwordsRemover_old = StopWordsRemover(inputCol="words_old", outputCol="filtered_old").setStopWords(add_stopwords)
stopwordsRemover_comment = StopWordsRemover(inputCol="words_comment", outputCol="filtered_comment").setStopWords(add_stopwords)
# bag of words count
countVectors_new = CountVectorizer(inputCol="filtered_new", outputCol="features_new", vocabSize=100000, minDF=5)
countVectors_old = CountVectorizer(inputCol="filtered_old", outputCol="features_old", vocabSize=100000, minDF=5)
countVectors_comment = CountVectorizer(inputCol="filtered_comment", outputCol="features_comment", vocabSize=100000, minDF=5)
countVectors_user = CountVectorizer(inputCol="words_name_user", outputCol="features_name_user", vocabSize=100000, minDF=5)
countVectors_title = CountVectorizer(inputCol="words_title_page", outputCol="features_title_page", vocabSize=100000, minDF=5)

In [None]:
#Changing the name of the column 'label' to 'category' as pyspark logistic model does not accept anything but 'label' as target.
df_json = df_json.withColumnRenamed("label", "category")

In [None]:
from pyspark.ml import Pipeline

from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorAssembler
#This would encode label into numerical values
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")

#Pipeline for creating jobs by spark. This does not have a logical consequence in our case but essential.
pipeline = Pipeline(stages=[label_stringIdx, regexTokenizer_new, regexTokenizer_old, regexTokenizer_title, regexTokenizer_user, regexTokenizer_comment, stopwordsRemover_new, stopwordsRemover_old, stopwordsRemover_comment, countVectors_comment, countVectors_new, countVectors_old, countVectors_user, countVectors_title])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(df_json)
dataset = pipelineFit.transform(df_json)
dataset.show(5)

In [None]:
# set seed for reproducibility
#Training and test split
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

In [None]:
#logistic regression model definition. This is a very naive attempt and would probably overfit without regularization.
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
#Pyspark would have us assemble the features using vector assembler
assembler = VectorAssembler(
    inputCols=['features_new','features_old','features_comment','features_name_user','features_title_page'],
    outputCol='features')
#Transforming the training data using the assembler
assembler.transform(trainingData)
#Pipeline based execution which is charcteristic for spark.
pipeline = Pipeline(stages=[assembler, lr])
#Fitting the model
model = pipeline.fit(trainingData)

In [None]:
#Checking the model performance on testData
predictions = model.transform(testData)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

In [None]:
from pyspark.ml.feature import HashingTF, IDF
#TF IDF based transformation in spark
hashingTF_name = HashingTF(inputCol="words_name_user", outputCol="rawFeatures_name", numFeatures=100000)
hashingTF_title = HashingTF(inputCol="words_title_page", outputCol="rawFeatures_title", numFeatures=100000)
hashingTF_new = HashingTF(inputCol="filtered_new", outputCol="rawFeatures_new", numFeatures=100000)
hashingTF_old = HashingTF(inputCol="filtered_old", outputCol="rawFeatures_old", numFeatures=100000)
hashingTF_comment = HashingTF(inputCol="filtered_comment", outputCol="rawFeatures_comment", numFeatures=100000)
idf_name = IDF(inputCol="rawFeatures_name", outputCol="features_idf_name", minDocFreq=5) #minDocFreq: remove sparse terms
idf_title = IDF(inputCol="rawFeatures_title", outputCol="features_idf_title", minDocFreq=5) #minDocFreq: remove sparse terms
idf_new = IDF(inputCol="rawFeatures_new", outputCol="features_idf_new", minDocFreq=5) #minDocFreq: remove sparse terms
idf_old = IDF(inputCol="rawFeatures_old", outputCol="features_idf_old", minDocFreq=5) #minDocFreq: remove sparse terms
idf_comment = IDF(inputCol="rawFeatures_comment", outputCol="features_idf_comment", minDocFreq=5) #minDocFreq: remove sparse terms

In [None]:
#Same style of execution as the first logistic model
pipeline_ti = Pipeline(stages=[label_stringIdx, regexTokenizer_new, regexTokenizer_old, regexTokenizer_title, regexTokenizer_user, regexTokenizer_comment, stopwordsRemover_new, stopwordsRemover_old, stopwordsRemover_comment, countVectors_comment, countVectors_new, countVectors_old, countVectors_user, countVectors_title, hashingTF_name, hashingTF_title, hashingTF_new, hashingTF_old, hashingTF_comment, idf_name, idf_title, idf_new, idf_old, idf_comment])
pipelineFit_ti = pipeline_ti.fit(df_json)
dataset_ti = pipelineFit_ti.transform(df_json)
(trainingData_ti, testData_ti) = dataset_ti.randomSplit([0.7, 0.3], seed = 100)
lr_ti = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
assembler_ti = VectorAssembler(
    inputCols=['rawFeatures_name','rawFeatures_title','rawFeatures_new','rawFeatures_old','rawFeatures_comment','features_idf_name','features_idf_title','features_idf_new','features_idf_old','features_idf_comment'],
    outputCol='features')
assembler_ti.transform(trainingData_ti)
pipeline_ti = Pipeline(stages=[assembler_ti, lr_ti])
trainingData_ti.cache()
testData_ti.cache()
model_ti = pipeline_ti.fit(trainingData_ti)

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
#In this section we try to run the previous model with various parameter values
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr_ti.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr_ti.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model_ti.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr_ti, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator_ti, \
                    numFolds=5)
assembler_ti_cv = VectorAssembler(
    inputCols=['rawFeatures_name','rawFeatures_title','rawFeatures_new','rawFeatures_old','rawFeatures_comment','features_idf_name','features_idf_title','features_idf_new','features_idf_old','features_idf_comment'],
    outputCol='features')
assembler_ti_cv.transform(trainingData_ti)
pipeline_ti_cv = Pipeline(stages=[assembler_ti_cv, cv])
trainingData_ti.cache()
testData_ti.cache()
model_ti_cv = pipeline_ti_cv.fit(trainingData_ti)

predictions = model_ti_cv.transform(testData_ti)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)