In [0]:
import os
import atexit
import sys

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=1
tasks_per_node=12
memory_per_task=1800 #1 gig per process, adjust accordingly
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="24:00" #1 hour
os.environ['SBATCH_PARTITION']='parallel' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

sj.wait_to_start()
sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)

INFO:sparkhpc.sparkjob:Submitted batch job 2626365

INFO:sparkhpc.sparkjob:Submitted cluster 1


In [0]:
df = sqlCtx.read.csv('CleanedNews.csv/part-00000-e0c20413-d9a2-4ae3-bc41-a77b460c6a58-c000.csv',inferSchema=True)
df = df.withColumnRenamed('_c0','claim').withColumnRenamed('_c1','claimant').withColumnRenamed('_c2','articles').withColumnRenamed('_c3','label')
df.printSchema()

root
 |-- claim: string (nullable = true)
 |-- claimant: string (nullable = true)
 |-- articles: string (nullable = true)
 |-- label: integer (nullable = true)



In [0]:
counts = df.select('label').groupBy('label').count().orderBy('count').collect()
counts

[Row(label=2, count=1696), Row(label=1, count=6451), Row(label=0, count=7408)]

In [0]:
lowestLabel,lowestCount = counts[0]
midLabel,midCount = counts[1]
highLabel,highCount = counts[2]

In [0]:
df_low_upscaled = df.filter(df.label==lowestLabel).sample(withReplacement=True,fraction = highCount/lowestCount)
df_mid_upscaled = df.filter(df.label==midLabel).sample(withReplacement=True,fraction = highCount/midCount)
df_high_upscaled = df.filter(df.label==highLabel)

In [0]:
from functools import reduce
from pyspark.sql import DataFrame
dfs_labelwise = [df_low_upscaled,df_mid_upscaled,df_high_upscaled]
df_balanced = reduce(DataFrame.unionAll, dfs_labelwise)

In [0]:
df_balanced.printSchema()

root
 |-- claim: string (nullable = true)
 |-- claimant: string (nullable = true)
 |-- articles: string (nullable = true)
 |-- label: integer (nullable = true)



In [0]:
df_balanced.count()

22302

In [0]:
counts = df_balanced.select('label').groupBy('label').count().orderBy('count').collect()
counts

[Row(label=1, count=7396), Row(label=0, count=7408), Row(label=2, count=7498)]

In [0]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover,CountVectorizer,IDF,VectorAssembler,Word2Vec,MinMaxScaler#,StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol='articles',outputCol='token_text')
stop_remove = StopWordsRemover(inputCol='token_text',outputCol='stop_token')
word2vec = Word2Vec (inputCol='stop_token',outputCol = "word_2_vec")
mms = MinMaxScaler(inputCol = 'word_2_vec',outputCol = "scaled")
assembler = VectorAssembler(inputCols=['scaled'],outputCol='features')

pipe = Pipeline(stages=[tokenizer,stop_remove,word2vec,mms,assembler])
pipelineFit = pipe.fit(df_balanced)
dataset = pipelineFit.transform(df_balanced)

In [0]:
training,test = dataset.randomSplit(weights = [0.8,0.2],seed = 0 )

In [0]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

nb = NaiveBayes()

gridSearch = ParamGridBuilder().addGrid(nb.smoothing,[0.0,0.2,0.4,0.6,0.8,1.0]).build()
cvEvaluater = MulticlassClassificationEvaluator(metricName="weightedPrecision",predictionCol="prediction")

cv = CrossValidator(estimator=nb,estimatorParamMaps=gridSearch,evaluator=cvEvaluater)
cvModel = cv.fit(training)

In [0]:
cvModel.avgMetrics

[0.4281263926545419,
 0.4281263926545419,
 0.4281263926545419,
 0.42825432472492053,
 0.42827508975918016,
 0.4282040830891455]

In [0]:
from sklearn.metrics import classification_report
prediction = cvModel.transform(test)
y_true = prediction.select('label').collect()
y_pred = prediction.select('prediction').collect()
print (classification_report(y_true,y_pred))

              precision    recall  f1-score   support

           0       0.44      0.57      0.50      1438
           1       0.46      0.45      0.45      1488
           2       0.41      0.30      0.35      1480

    accuracy                           0.44      4406
   macro avg       0.44      0.44      0.43      4406
weighted avg       0.44      0.44      0.43      4406

