In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('operations').getOrCreate()

In [2]:
df = spark.read.csv('Dataset/integrate_data1.csv', inferSchema=True, header=True)


In [3]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler


In [4]:
assembler = VectorAssembler(
  inputCols=['year',
             'month',
             'day',
             'hour',
             'season',
             'DEWP',
             'HUMI',
             'PRES',
             'TEMP',
             'Iws',
             'precipitation',
             'Iprec',
             'cbwd_new'],
              outputCol="features")
print(assembler)

VectorAssembler_4a5583c5d0da87eba604


In [5]:
# Let's transform the data. 
output = assembler.transform(df)
# Let's import the string indexer (similar to the logistic regression exercises).
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="Air Quality", outputCol="AirQualityIndex")
output_fixed = indexer.fit(output).transform(output)
# Let's select the two columns we want. Features (which contains vectors), and the predictor.
final_data = output_fixed.select("features",'AirQualityIndex')
# Split the training and testing set.
train_data,test_data = final_data.randomSplit([0.8,0.2])

In [6]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

In [7]:
dtc = DecisionTreeClassifier(labelCol='AirQualityIndex',featuresCol='features',
                             impurity='entropy',maxBins=80,maxDepth=30)
dtc_model = dtc.fit(train_data)
dtc_predictions = dtc_model.transform(test_data)

In [9]:
# Let's start off with binary classification.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Note that the label column isn't named label, it's named PrivateIndex in this case.
my_binary_eval = BinaryClassificationEvaluator(labelCol = 'AirQualityIndex')
# This is the area under the curve. This indicates that the data is highly seperable.
print("DTC")
print(my_binary_eval.evaluate(dtc_predictions))

dtc_model.featureImportances

DTC
0.7023005487547488


SparseVector(13, {0: 0.0764, 1: 0.0794, 2: 0.176, 3: 0.1387, 4: 0.0092, 5: 0.1257, 6: 0.057, 7: 0.1014, 8: 0.0562, 9: 0.1068, 10: 0.0102, 11: 0.0099, 12: 0.0531})

In [9]:
# import sys
# !conda install --yes --prefix {sys.prefix} scikit-learn

In [10]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics
# prediction1 = np.array(dtc_predictions.select('prediction').collect())
# label = np.array(AirQualityIndex.select('AirQualityIndex').collect())

results = dtc_predictions.select(['prediction', 'AirQualityIndex'])
predictionAndLabels=results.rdd
metrics = MulticlassMetrics(predictionAndLabels)

# Overall statistics
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)




Recall = 0.7177131189382338
F1 Score = 0.7177131189382338




In [11]:
# $hadoop version

In [12]:
rfc = RandomForestClassifier(labelCol='AirQualityIndex',featuresCol='features',impurity='entropy'
#                              ,maxBins=20,maxDepth=10
                            )
rfc_model = rfc.fit(train_data)
rfc_predictions = rfc_model.transform(test_data)
# RFC improves accuracy but also model complexity. RFC outperforms DTC in nearly every situation.
print("RFC")
print(my_binary_eval.evaluate(rfc_predictions))

rfc_predictions.show()



RFC
0.5783792997689357
+--------------------+---------------+--------------------+--------------------+----------+
|            features|AirQualityIndex|       rawPrediction|         probability|prediction|
+--------------------+---------------+--------------------+--------------------+----------+
|[2011.0,12.0,29.0...|            1.0|[8.92805658106875...|[0.44640282905343...|       0.0|
|[2011.0,12.0,29.0...|            1.0|[8.92805658106875...|[0.44640282905343...|       0.0|
|[2011.0,12.0,29.0...|            0.0|[8.85040133672676...|[0.44252006683633...|       0.0|
|[2011.0,12.0,29.0...|            0.0|[8.61963266360140...|[0.43098163318007...|       0.0|
|[2011.0,12.0,29.0...|            3.0|[8.08344698140270...|[0.40417234907013...|       0.0|
|[2011.0,12.0,29.0...|            2.0|[8.08344698140270...|[0.40417234907013...|       0.0|
|[2011.0,12.0,29.0...|            0.0|[8.08344698140270...|[0.40417234907013...|       0.0|
|[2011.0,12.0,31.0...|            2.0|[9.15347668625338..

In [13]:
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- season: integer (nullable = true)
 |-- DEWP: integer (nullable = true)
 |-- HUMI: double (nullable = true)
 |-- PRES: double (nullable = true)
 |-- TEMP: integer (nullable = true)
 |-- Iws: integer (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- Iprec: double (nullable = true)
 |-- Air Quality: integer (nullable = true)
 |-- cbwd_new: integer (nullable = true)



In [17]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics
# prediction1 = np.array(dtc_predictions.select('prediction').collect())
# label = np.array(AirQualityIndex.select('AirQualityIndex').collect())

results = rfc_predictions.select(['prediction', 'AirQualityIndex'])
predictionAndLabels=results.rdd
metrics = MulticlassMetrics(predictionAndLabels)

# Overall statistics
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)



Recall = 0.530032329419772
F1 Score = 0.530032329419772


