In [8]:
# ---------------------------------------Lab3---------------------------------------------
# The data has been trained using two models which are Logistic Regression and Naive Bayes.
# Some code snippets have been taken from link: https://towardsdatascience.com/multi-class-text-classification-with-pyspark-7d78d022ed35

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import HashingTF, IDF, NGram
from pyspark.sql import SparkSession
from pyspark.ml.feature import StopWordsRemover
from operator import add
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import monotonically_increasing_id
import sys
import re

# This is for the main data which has 200 files (Includes traning and test data)
def part2(context):
    file = context[0]
    words = re.sub('[^a-z0-9]+',' ',context[1].lower()).split()
    file = file.split("/")[-1]
    if(re.match( r'FileSportsRaw.*', file)):
        return (0.0, file ,words)
    elif(re.match( r'FilePoliticsRaw.*', file)):
        return (1.0, file, words)
    elif(re.match( r'FileBusinessRaw.*', file)):
        return (2.0, file, words)
    else:
        return (3.0, file, words)

# This is for the unknown data which has 40 files (Includes test data)
def part2_unknown(context):
    file = context[0]
    words = re.sub('[^a-z0-9]+',' ',context[1].lower()).split()
    file = file.split("/")[-1]
    if(re.match( r'FileSportsUnknown.*', file)):
        return (0.0, file ,words)
    elif(re.match( r'FilePoliticsUnknown.*', file)):
        return (1.0, file, words)
    elif(re.match( r'FileBusinessUnknown.*', file)):
        return (2.0, file, words)
    else:
        return (3.0, file, words)


conf = SparkConf()
conf.setAppName( "part1" )
conf.set("spark.executor.memory", "2g")
sc = SparkContext.getOrCreate(conf = conf)

# -----------------------------------For main data set-----------------------------------------------
#reading input
data_lines =sc.wholeTextFiles("AllArticlesFolder")

#configuring SparkSession
spark=SparkSession(sc)

#Converting to datafram
hasattr(data_lines, "toDF")

#tokeinizing the words and converting into dataframes
tokenizeDf = data_lines.map(part2).toDF(["label", "fileName", "words"])

#removing the Stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_column")
filteredWordsDf = remover.transform(tokenizeDf)

#finding the tf value
hashingTF = HashingTF(inputCol = "filtered_column", outputCol = "hashFeatures")
tf = hashingTF.transform(filteredWordsDf)

#finding the idf value
idf = IDF(inputCol = "hashFeatures", outputCol = "features", )
idfModel = idf.fit(tf)
modifiedData = idfModel.transform(tf)

category0Data = modifiedData.filter(modifiedData.label == 0.0)
category1Data = modifiedData.filter(modifiedData.label == 1.0)
category2Data = modifiedData.filter(modifiedData.label == 2.0)
category3Data = modifiedData.filter(modifiedData.label == 3.0)

idxDf = category0Data.withColumn("idx", monotonically_increasing_id())
trainingData0 = idxDf.sort("idx").limit(40)
testData0 = idxDf.sort("idx", ascending = False).limit(10)

idxDf = category1Data.withColumn("idx", monotonically_increasing_id())
trainingData1 = idxDf.sort("idx").limit(40)
testData1 = idxDf.sort("idx", ascending = False).limit(10)

idxDf = category2Data.withColumn("idx", monotonically_increasing_id())
trainingData2 = idxDf.sort("idx").limit(40)
testData2 = idxDf.sort("idx", ascending = False).limit(10)

idxDf = category3Data.withColumn("idx", monotonically_increasing_id())
trainingData3 = idxDf.sort("idx").limit(40)
testData3 = idxDf.sort("idx", ascending = False).limit(10)

result_training_data_1 = trainingData0.union(trainingData1)
result_training_data_2 = trainingData2.union(trainingData3)
result_training_data = result_training_data_1.union(result_training_data_2)

result_test_data_1 = testData0.union(testData1)
result_test_data_2 = testData2.union(testData3)
result_test_data = result_test_data_1.union(result_test_data_2)

#--------Logistic Regression Model----------
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(result_training_data)

predictions = lrModel.transform(result_test_data)
# predictions.select("fileName","probability","label","prediction").rdd.saveAsTextFile("output_lr")

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy LR on main data  = %g" % (100 * accuracy))

#--------Naive Bayes Classficiation Model----------
nb = NaiveBayes(smoothing=1)
nbmodel = nb.fit(result_training_data)

predictions = nbmodel.transform(result_test_data)
predictions.filter(predictions['prediction'] == 0)
# predictions.select("fileName","probability","label","prediction").rdd.saveAsTextFile("output_nb")

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy NB on main data  = %g" % (100 * accuracy))
print("Confusion Matrix for main data")
# Building dataset of prediction and label for Confusion Matrix

predictionsLabels = predictions.select("prediction","label").rdd
metrics = MulticlassMetrics(predictionsLabels)

# Confusion Matrix
cm = metrics.confusionMatrix().toArray()
print(cm)

# -----------------------------------For unknown data set-----------------------------------------------
#reading input
data_lines =sc.wholeTextFiles("UnknownArticlesFolder")

hasattr(data_lines, "toDF")

#tokeinizing the words and converting into dataframes
tokenizeDf = data_lines.map(part2_unknown).toDF(["label", "fileName", "words"])

#removing the Stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_column")
filteredWordsDf = remover.transform(tokenizeDf)

#finding the tf value
hashingTF = HashingTF(inputCol = "filtered_column", outputCol = "hashFeatures")
tf = hashingTF.transform(filteredWordsDf)

#finding the idf value
idf = IDF(inputCol = "hashFeatures", outputCol = "features", )
idfModel = idf.fit(tf)
modifiedData = idfModel.transform(tf)

category0Data = modifiedData.filter(modifiedData.label == 0.0)
category1Data = modifiedData.filter(modifiedData.label == 1.0)
category2Data = modifiedData.filter(modifiedData.label == 2.0)
category3Data = modifiedData.filter(modifiedData.label == 3.0)

idxDf = category0Data.withColumn("idx", monotonically_increasing_id())
unknown_testData0 = idxDf.sort("idx", ascending = False).limit(10)

idxDf = category1Data.withColumn("idx", monotonically_increasing_id())
unknown_testData1 = idxDf.sort("idx", ascending = False).limit(10)

idxDf = category2Data.withColumn("idx", monotonically_increasing_id())
unknown_testData2 = idxDf.sort("idx", ascending = False).limit(10)

idxDf = category3Data.withColumn("idx", monotonically_increasing_id())
unknown_testData3 = idxDf.sort("idx", ascending = False).limit(10)

unknown_test_data_1 = unknown_testData0.union(unknown_testData1)
unknown_test_data_2 = unknown_testData2.union(unknown_testData3)
unknown_test_data = unknown_test_data_1.union(unknown_test_data_2)

#--------Logistic Regression Model----------
predictions = lrModel.transform(unknown_test_data)
# predictions.select("fileName","probability","label","prediction").rdd.saveAsTextFile("sparkFolder/output")

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy LR on unknown data  = %g" % (100 * accuracy))

#--------Naive Bayes Classficiation Model----------
predictions = nbmodel.transform(unknown_test_data)
predictions.filter(predictions['prediction'] == 0)
#predictions.select("fileName","probability","label","prediction").rdd.saveAsTextFile("sparkFolder/outputNB")

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy NB on unknown data = %g" % (100 * accuracy))
print("Confusion Matrix for unknown data")

# Building dataset of prediction and label for Confusion Matrix
predictionsLabels = predictions.select("prediction","label").rdd
metrics = MulticlassMetrics(predictionsLabels)

# Confusion Matrix
cm = metrics.confusionMatrix().toArray()
print(cm)

sc.stop()

Accuracy LR on main data  = 72.2917
Accuracy NB on main data  = 77.1068
Confusion Matrix for main data
[[ 9.  0.  1.  0.]
 [ 0.  8.  1.  1.]
 [ 0.  1.  9.  0.]
 [ 0.  2.  3.  5.]]
Accuracy LR on unknown data  = 82.1212
Accuracy NB on unknown data = 82.7871
Confusion Matrix for unknown data
[[ 9.  0.  1.  0.]
 [ 0.  7.  3.  0.]
 [ 0.  1.  8.  1.]
 [ 0.  1.  0.  9.]]
