In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\spark-2.4.5-bin-hadoop2.7'

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-bank').getOrCreate()
df = spark.read.csv('PYSPARK_DATA_TRAFFIC.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- DAY: string (nullable = true)
 |-- STORES: string (nullable = true)
 |-- CONGESTION: integer (nullable = true)
 |-- OSM_ID: integer (nullable = true)
 |-- LARGER_THAN_200M: string (nullable = true)
 |-- KMH: integer (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- HOUR: integer (nullable = true)
 |-- MIN: integer (nullable = true)



In [3]:
import pandas as pd
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
DAY,MONDAY,MONDAY,TUESDAY,WEDNESDAY,WEDNESDAY
STORES,OPEN,CLOSED,OPEN,OPEN,CLOSED
CONGESTION,0,0,0,0,0
OSM_ID,176665188,176665188,176665188,176665188,176665188
LARGER_THAN_200M,YES,YES,YES,YES,YES
KMH,50,50,50,50,50
CATEGORY,KA_1K,KA_1K,KA_1K,KA_1K,KA_1K
HOUR,17,22,18,14,20
MIN,15,0,30,30,45


In [4]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
indexers = [StringIndexer(inputCol=column, outputCol=column+"_n").fit(df) for column in list(set(df.columns)-set(['CONGESTION'])) ]
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)
df_r.show()

+---------+-------+----------+---------+----------------+---+--------+----+---+------------------+-----+-----+----------+-----+--------+------+--------+
|      DAY| STORES|CONGESTION|   OSM_ID|LARGER_THAN_200M|KMH|CATEGORY|HOUR|MIN|LARGER_THAN_200M_n|MIN_n|DAY_n|CATEGORY_n|KMH_n|STORES_n|HOUR_n|OSM_ID_n|
+---------+-------+----------+---------+----------------+---+--------+----+---+------------------+-----+-----+----------+-----+--------+------+--------+
|MONDAY   |   OPEN|         0|176665188|             YES| 50|   KA_1K|  17| 15|               1.0|  0.0|  6.0|       1.0|  1.0|     1.0|   3.0|     0.0|
|MONDAY   | CLOSED|         0|176665188|             YES| 50|   KA_1K|  22|  0|               1.0|  1.0|  6.0|       1.0|  1.0|     0.0|   6.0|     0.0|
|TUESDAY  |   OPEN|         0|176665188|             YES| 50|   KA_1K|  18| 30|               1.0|  2.0|  5.0|       1.0|  1.0|     1.0|  13.0|     0.0|
|WEDNESDAY|   OPEN|         0|176665188|             YES| 50|   KA_1K|  14| 30|   

In [5]:
df_r.toPandas().to_csv("Spark_num_and_labels.csv")

In [6]:
data = df_r.drop('HOUR','MIN','DAY','STORES','OSM_ID','LARGER_THAN_200M','KMH','CATEGORY')

In [7]:
from pyspark.ml.feature import VectorAssembler
vector_assembler = VectorAssembler(\
inputCols=['HOUR_n','MIN_n','KMH_n','CATEGORY_n','OSM_ID_n','LARGER_THAN_200M_n','STORES_n','DAY_n'],\
outputCol="features")
df_temp = vector_assembler.transform(data)
df_temp.show(3)

+----------+------------------+-----+-----+----------+-----+--------+------+--------+--------------------+
|CONGESTION|LARGER_THAN_200M_n|MIN_n|DAY_n|CATEGORY_n|KMH_n|STORES_n|HOUR_n|OSM_ID_n|            features|
+----------+------------------+-----+-----+----------+-----+--------+------+--------+--------------------+
|         0|               1.0|  0.0|  6.0|       1.0|  1.0|     1.0|   3.0|     0.0|[3.0,0.0,1.0,1.0,...|
|         0|               1.0|  1.0|  6.0|       1.0|  1.0|     0.0|   6.0|     0.0|[6.0,1.0,1.0,1.0,...|
|         0|               1.0|  2.0|  5.0|       1.0|  1.0|     1.0|  13.0|     0.0|[13.0,2.0,1.0,1.0...|
+----------+------------------+-----+-----+----------+-----+--------+------+--------+--------------------+
only showing top 3 rows



In [8]:
df = df_temp.drop('HOUR_n','MIN_n','KMH_n','CATEGORY_n','OSM_ID_n','LARGER_THAN_200M_n','STORES_n','DAY_n')
df.show(3)

+----------+--------------------+
|CONGESTION|            features|
+----------+--------------------+
|         0|[3.0,0.0,1.0,1.0,...|
|         0|[6.0,1.0,1.0,1.0,...|
|         0|[13.0,2.0,1.0,1.0...|
+----------+--------------------+
only showing top 3 rows



In [9]:
from pyspark.ml.feature import StringIndexer
l_indexer = StringIndexer(inputCol="CONGESTION", outputCol="labelIndex")
df = l_indexer.fit(df).transform(df)

In [10]:
df.show(100)

+----------+--------------------+----------+
|CONGESTION|            features|labelIndex|
+----------+--------------------+----------+
|         0|[3.0,0.0,1.0,1.0,...|       0.0|
|         0|[6.0,1.0,1.0,1.0,...|       0.0|
|         0|[13.0,2.0,1.0,1.0...|       0.0|
|         0|[10.0,2.0,1.0,1.0...|       0.0|
|         0|[5.0,3.0,1.0,1.0,...|       0.0|
|         0|[13.0,1.0,1.0,1.0...|       0.0|
|         0|[3.0,1.0,1.0,1.0,...|       0.0|
|         1|[8.0,1.0,1.0,1.0,...|       1.0|
|         1|[1.0,0.0,1.0,1.0,...|       1.0|
|         1|[6.0,0.0,1.0,1.0,...|       1.0|
|         0|[4.0,1.0,1.0,1.0,...|       0.0|
|         0|[4.0,2.0,1.0,1.0,...|       0.0|
|         1|[11.0,0.0,1.0,1.0...|       1.0|
|         0|[2.0,3.0,1.0,1.0,...|       0.0|
|         0|[4.0,2.0,1.0,1.0,...|       0.0|
|         0|[8.0,1.0,1.0,1.0,...|       0.0|
|         0|[15.0,3.0,1.0,1.0...|       0.0|
|         1|[1.0,3.0,1.0,1.0,...|       1.0|
|         0|[9.0,0.0,1.0,1.0,...|       0.0|
|         

In [104]:
df = df.drop('CONGESTION')
(trainingData, testData) = df.randomSplit([0.8, 0.2])

In [105]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [106]:
dt = DecisionTreeClassifier(labelCol="labelIndex", featuresCol="features")
model = dt.fit(trainingData)

In [107]:
predictions = model.transform(testData)

In [108]:
predictions.select("prediction", "labelIndex").show(5)

+----------+----------+
|prediction|labelIndex|
+----------+----------+
|       0.0|       0.0|
|       0.0|       1.0|
|       0.0|       1.0|
|       0.0|       0.0|
|       0.0|       1.0|
+----------+----------+
only showing top 5 rows



In [109]:
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction",metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(accuracy)

0.7104651162790697


In [85]:
print(model)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_944116beed09) of depth 5 with 29 nodes
