In [60]:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [61]:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [62]:
sql_context = SparkSession.builder.master('local[*]').getOrCreate()
sql_context

In [63]:
data = sql_context.read.csv("./sangam.csv",header=True,inferSchema='True')
data.show(3)

+-------------------+-----------------+------------------+----------+---------+---------+-----------------+---------+
|               Date|               DO|                pH|       ORP|     Cond|     Temp|              WQI|   Status|
+-------------------+-----------------+------------------+----------+---------+---------+-----------------+---------+
|2019-01-12 15:33:16|9.494212037759977|13.765933596525262|0.14840198|12.954404|17.830261|54.81198760710678|Very Poor|
|2019-01-12 15:34:17|9.500406233111164|13.337534775077296| 0.1445036| 8.547796|17.798553|51.48805043895461|Very Poor|
|2019-01-12 15:35:18|9.487447743811652|13.198463167914054|0.13437152|16.847918| 17.86493|50.42070179995088|Very Poor|
+-------------------+-----------------+------------------+----------+---------+---------+-----------------+---------+
only showing top 3 rows



In [64]:
import os

import csv
import itertools
import collections

from scipy import stats

from sklearn.utils import resample
from sklearn.model_selection import train_test_split

In [65]:
data=data.drop('Date')
data

DataFrame[DO: double, pH: double, ORP: double, Cond: double, Temp: double, WQI: double, Status: string]

In [66]:
from pyspark.ml.feature import StringIndexer

In [67]:
ind = StringIndexer(inputCol = 'Status', outputCol = 'Status_index')
data=ind.fit(data).transform(data)

In [68]:
from pyspark.ml.feature import VectorAssembler

numericCols = ['DO', 'pH', 'ORP', 'Cond','Temp','WQI']
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")

In [69]:
data = assembler.transform(data)
data.show(5)

+-----------------+------------------+----------+---------+---------+-----------------+---------+------------+--------------------+
|               DO|                pH|       ORP|     Cond|     Temp|              WQI|   Status|Status_index|            features|
+-----------------+------------------+----------+---------+---------+-----------------+---------+------------+--------------------+
|9.494212037759977|13.765933596525262|0.14840198|12.954404|17.830261|54.81198760710678|Very Poor|         2.0|[9.49421203775997...|
|9.500406233111164|13.337534775077296| 0.1445036| 8.547796|17.798553|51.48805043895461|Very Poor|         2.0|[9.50040623311116...|
|9.487447743811652|13.198463167914054|0.13437152|16.847918| 17.86493|50.42070179995088|Very Poor|         2.0|[9.48744774381165...|
|9.486121036332559|12.732116142804621|0.14270854|16.884756|17.871735|  46.901646035597|Very Poor|         2.0|[9.48612103633255...|
|9.485210958535616| 13.28446675206912|0.13752413|16.987082|17.876404|51.1046

In [70]:
final=data.select("features","Status_index")
final.show()

+--------------------+------------+
|            features|Status_index|
+--------------------+------------+
|[9.49421203775997...|         2.0|
|[9.50040623311116...|         2.0|
|[9.48744774381165...|         2.0|
|[9.48612103633255...|         2.0|
|[9.48521095853561...|         2.0|
|[18.413895,13.107...|         2.0|
|[17.560917,13.272...|         2.0|
|[17.419922,13.380...|         2.0|
|[17.972828,13.549...|         2.0|
|[9.48054395114254...|         2.0|
|[9.47934358211962...|         2.0|
|[16.906841,13.543...|         2.0|
|[16.089485,13.763...|         2.0|
|[17.893639,13.068...|         2.0|
|[10.478089,12.935...|         2.0|
|[18.264702,13.677...|         2.0|
|[18.571953,13.308...|         2.0|
|[17.599611,13.232...|         2.0|
|[11.472302,13.474...|         2.0|
|[9.42847816151952...|         2.0|
+--------------------+------------+
only showing top 20 rows



In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier

In [None]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'Status_index',maxIter=50, regParam=0.01, elasticNetParam=0.0)
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Status_index')
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'Status_index',maxDepth=5)

In [None]:
lrmodel = lr.fit(train)

In [None]:
rfModel = rf.fit(train)

In [None]:
dtModel = dt.fit(train)

In [None]:
lrmodel.save("lr_model")
rfModel.save("rf_model")
dtModel.save("dt__model")


In [None]:
test.show(1)

+--------------------+------------+
|            features|Status_index|
+--------------------+------------+
|[7.23804924388176...|         2.0|
+--------------------+------------+
only showing top 1 row



In [None]:
lrPrediction = lrmodel.transform(test)
lrPrediction.show(3)

+--------------------+------------+--------------------+--------------------+----------+
|            features|Status_index|       rawPrediction|         probability|prediction|
+--------------------+------------+--------------------+--------------------+----------+
|[7.23804924388176...|         2.0|[1.96782387210595...|[5.32319996059868...|       2.0|
|[7.27753040110670...|         3.0|[2.88756671123408...|[0.08350912482211...|       2.0|
|[7.28515672754443...|         3.0|[2.91017331710185...|[0.09535542184385...|       2.0|
+--------------------+------------+--------------------+--------------------+----------+
only showing top 3 rows



In [None]:
dtPrediction = dtModel.transform(test)

In [None]:
rfPrediction = rfModel.transform(test)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Status_index", predictionCol="prediction")
accuracy = evaluator.evaluate(lrPrediction)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 0.8996208190541171
Test Error = 0.10037918094588294


In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="Status_index", predictionCol="prediction")
accuracy = evaluator.evaluate(dtPrediction)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 0.9951475890391911
Test Error = 0.004852410960808906


In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="Status_index", predictionCol="prediction")
accuracy = evaluator.evaluate(rfPrediction)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 0.9951096886653021
Test Error = 0.004890311334697861


In [None]:
from sklearn.metrics import confusion_matrix
y_pred= lrPrediction.select('prediction').collect()
y_orig= lrPrediction.select('Status_index').collect()
cm = confusion_matrix(y_orig,y_pred)
print(cm)

: 

In [None]:
plt.figure(figsize=(7,5))
sns.heatmap(cm, annot=True, fmt=".3f", linewidths=.5, square = True, cmap = 'Blues_r');
plt.ylabel('Actual label');
plt.xlabel('Predicted label');
all_sample_title = "Accuracy Score: {0}".format(accuracy)
plt.title(all_sample_title, size = 15);

: 

: 

: 

: 

: 

: 