In [None]:
!pip install pyspark

In [49]:
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf,col
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType

# tools
import random
import os

In [50]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Initialise

In [51]:
"""
Initialize Spark session object
"""
spark = SparkSession \
    .builder \
    .appName("Python Spark Naive Bayes CountVectorizer") \
    .getOrCreate()

In [53]:
data = spark.read.csv(os.path.join(os.getcwd(),"..","data/preprocessed_data.csv"), header=True)
# data = spark.read.csv("preprocessed_data.csv", header=True)

In [55]:
data.show(20)

+--------------------+--------------------+--------------------+--------------------+------+
|            Datetime|               Title|         Description|             Content|Change|
+--------------------+--------------------+--------------------+--------------------+------+
|2019-01-01 20:56:...|Car sales languis...|Tight liquidity, ...|nonetight,liquidi...|  -1.2|
|2019-01-01 18:07:...|Tata Motors domes...|The company said ...|nonethe,company,p...|  -1.2|
|2019-01-02 20:30:...|An evening walk d...|At the close of m...|weak,global,cue,d...| -1.75|
|2019-01-02 14:12:...|Top auto trends o...|The biggest news ...|automotive,indust...|  -1.2|
|2019-01-04 16:18:...|No decision regar...|Economic Affairs ...|decision,recently...| -0.51|
|2019-01-03 14:37:...|HDFC MF surpasses...|As of December-en...|hdfc,mutual,fund,...| -2.85|
|2019-01-03 15:14:...|Axle load norms b...|Sales of medium a...|new,axle,load,nor...|  1.15|
|2019-01-04 09:46:...|Allocate 30-40% o...|For aggressive in...|tell,s

In [109]:
'''
Content was pre-processed and stored as comma seperated String
'''

strList_toList = udf(lambda r: r.split(','), ArrayType(StringType()))
func_tolabel = udf(lambda x : 1 if(x > 0) else 0 ,IntegerType())

list_content = data.withColumn('Content',strList_toList('Content')).withColumn('label',col("Change").cast("Float")).na.drop("any")
list_content = list_content.withColumn('label',func_tolabel('label'))

list_content.show(20)

+--------------------+--------------------+--------------------+--------------------+------+-----+
|            Datetime|               Title|         Description|             Content|Change|label|
+--------------------+--------------------+--------------------+--------------------+------+-----+
|2019-01-01 20:56:...|Car sales languis...|Tight liquidity, ...|[nonetight, liqui...|  -1.2|    0|
|2019-01-01 18:07:...|Tata Motors domes...|The company said ...|[nonethe, company...|  -1.2|    0|
|2019-01-02 20:30:...|An evening walk d...|At the close of m...|[weak, global, cu...| -1.75|    0|
|2019-01-02 14:12:...|Top auto trends o...|The biggest news ...|[automotive, indu...|  -1.2|    0|
|2019-01-04 16:18:...|No decision regar...|Economic Affairs ...|[decision, recent...| -0.51|    0|
|2019-01-03 14:37:...|HDFC MF surpasses...|As of December-en...|[hdfc, mutual, fu...| -2.85|    0|
|2019-01-03 15:14:...|Axle load norms b...|Sales of medium a...|[new, axle, load,...|  1.15|    1|
|2019-01-0

### Vectorise the content

In [110]:

to_vectorize = list_content.select('Datetime','Content', 'label')
cv = CountVectorizer(inputCol="Content", outputCol="features")

model_vec = cv.fit(to_vectorize)
result_vec = model_vec.transform(to_vectorize)
selectedData = result_vec.select('Datetime','features', 'label')

In [111]:
selectedData.show(21)


+--------------------+--------------------+-----+
|            Datetime|            features|label|
+--------------------+--------------------+-----+
|2019-01-01 20:56:...|(11489,[0,1,2,3,4...|    0|
|2019-01-01 18:07:...|(11489,[0,1,3,7,8...|    0|
|2019-01-02 20:30:...|(11489,[0,1,2,3,6...|    0|
|2019-01-02 14:12:...|(11489,[0,1,3,4,5...|    0|
|2019-01-04 16:18:...|(11489,[0,2,6,7,9...|    0|
|2019-01-03 14:37:...|(11489,[1,2,5,8,9...|    0|
|2019-01-03 15:14:...|(11489,[0,1,4,6,7...|    1|
|2019-01-04 09:46:...|(11489,[0,1,3,4,6...|    0|
|2019-01-07 10:28:...|(11489,[0,1,2,4,6...|    0|
|2019-01-07 14:55:...|(11489,[0,1,2,3,5...|    0|
|2019-01-07 17:26:...|(11489,[0,1,2,3,4...|    0|
|2019-01-08 12:47:...|(11489,[1,2,7,12,...|    0|
|2019-01-09 11:56:...|(11489,[2,6,12,15...|    0|
|2019-01-11 09:34:...|(11489,[0,1,3,4,6...|    0|
|2019-01-10 18:37:...|(11489,[0,1,2,3,4...|    0|
|2019-01-11 18:12:...|(11489,[1,2,4,5,7...|    0|
|2019-01-11 15:39:...|(11489,[2,5,6,20,...|    0|


### Building Naive Bayes Classification

In [106]:
"""
Define TruePositive, FalsePositive and FalseNegative
x = prediction, y = label
"""
TP = udf(lambda x,y: int(x==1 and y==1))
FP = udf(lambda x,y: int(x==1 and y==0))
FN = udf(lambda x,y: int(x==0 and y==1))

In [112]:
'''
Naive-Bayes following from CountVectorizer
'''
def NAIVEBAYES_CV(smooth=1, model_type="multinomial"): 
    
    # separating train/test data
    # taking equal number of positive and negative changes while splitting
    training_negative, test_negative = selectedData.where(selectedData.label == 0).randomSplit([0.7, 0.3])
    training_negative, test_positive = selectedData.where(selectedData.label == 1).randomSplit([0.7, 0.3])

    training = training_negative.union(training_negative)
    test = test_negative.union(test_positive)

    # create trainer with parameters then train
    # smoothing: smooth probabilities of 0 to the input
    nb = NaiveBayes(smoothing=smooth, modelType=model_type)
    model_NB = nb.fit(training)

    # display on test set: appends a prediction column
    predictions = model_NB.transform(test)
    
    # diagnostic testing
    prela_df = predictions.select("prediction","label")
    prela_df=prela_df.withColumn("TP", TP(prela_df.prediction,prela_df.label))
    prela_df=prela_df.withColumn("FP", FP(prela_df.prediction,prela_df.label))
    prela_df=prela_df.withColumn("FN", FN(prela_df.prediction,prela_df.label))
    
#     return prela_df

    TP_ = prela_df.where(prela_df.TP==1).count()
    FP_ = prela_df.where(prela_df.FP==1).count()
    FN_ = prela_df.where(prela_df.FN==1).count()

    precision = TP_/(TP_+FP_)
    recall = TP_/(TP_+FN_)
    F1 = 2*(precision*recall)/(precision+recall)

    # compute accuracy of on test set: compares labelCol and predictionCol
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    # return test results and model object
    return (accuracy,precision,recall,F1,model_NB,prela_df)


In [113]:
## Manually trying few examples
# result = NAIVEBAYES_CV(0.2684835187532758,"multinomial")

acc, precision, recall, F1, modelNB1, prela_df1 = NAIVEBAYES_CV(0.2684835187532758,"multinomial")

Py4JJavaError: An error occurred while calling o2343.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 117.0 failed 1 times, most recent failure: Lost task 0.0 in stage 117.0 (TID 107) (LAPTOP-HS7I2DHP executor driver): java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:323)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:982)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:391)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:76)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:323)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:982)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:391)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:76)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)


In [None]:
'''
Iteration tests on Naive-Bayes

iter_total: iterations for different smoothing nb
iter_each: iterations for the same smoothing nb
'''
import statistics

extract_method = "CountVectorizer"
iter_each = 10
iter_total = 50
m_types = ["complement", "multinomial"]
accs = []
f1s = []
for model_type in m_types:
    for k in range(iter_total):
        accuracies = []
        smoothing = random.uniform(0.01, 0.8)
        for i in range(iter_each):
            acc,precision,recall,F1,modelNB = NAIVEBAYES_CV(smoothing, model_type)
            accs.append(acc)
            f1s.append(F1)
        mean_acc = statistics.mean(accs)
        mean_f1 = statistics.mean(f1s)
        print("=> Mean_acc: ", mean_acc," => Mean_f1: ",mean_f1, "- Smoothing:", smoothing, "- Model:", model_type)
        means.append((mean_acc,mean_f1, smoothing, model_type, extract_method))