In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.classification  import RandomForestClassifier, DecisionTreeClassifier, GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, VectorIndexer
from pyspark.sql.functions import *

In [2]:
spark=SparkSession.builder.appName('SparkDT').getOrCreate()

In [3]:
df = spark.read.csv('sales_data.csv', header = True, inferSchema=True)
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Customer_Age: integer (nullable = true)
 |-- Age_Group: string (nullable = true)
 |-- Customer_Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Sub_Category: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Order_Quantity: integer (nullable = true)
 |-- Unit_Cost: integer (nullable = true)
 |-- Unit_Price: integer (nullable = true)
 |-- Profit: integer (nullable = true)
 |-- Cost: integer (nullable = true)
 |-- Revenue: integer (nullable = true)



In [4]:
stringIndex = StringIndexer(inputCol='Customer_Gender', outputCol='Gender_Index')
df = stringIndex.fit(df).transform(df)
df.show(5)

+-------------------+---+--------+----+------------+--------------+---------------+---------+----------------+----------------+------------+-------------------+--------------+---------+----------+------+----+-------+------------+
|               Date|Day|   Month|Year|Customer_Age|     Age_Group|Customer_Gender|  Country|           State|Product_Category|Sub_Category|            Product|Order_Quantity|Unit_Cost|Unit_Price|Profit|Cost|Revenue|Gender_Index|
+-------------------+---+--------+----+------------+--------------+---------------+---------+----------------+----------------+------------+-------------------+--------------+---------+----------+------+----+-------+------------+
|2013-11-26 00:00:00| 26|November|2013|          19|   Youth (<25)|              M|   Canada|British Columbia|     Accessories|  Bike Racks|Hitch Rack - 4-Bike|             8|       45|       120|   590| 360|    950|         0.0|
|2015-11-26 00:00:00| 26|November|2015|          19|   Youth (<25)|             

In [5]:
stringIndex1 = StringIndexer(inputCol='Product_Category', outputCol='Label')
df = stringIndex1.fit(df).transform(df)
df.show(5)

+-------------------+---+--------+----+------------+--------------+---------------+---------+----------------+----------------+------------+-------------------+--------------+---------+----------+------+----+-------+------------+-----+
|               Date|Day|   Month|Year|Customer_Age|     Age_Group|Customer_Gender|  Country|           State|Product_Category|Sub_Category|            Product|Order_Quantity|Unit_Cost|Unit_Price|Profit|Cost|Revenue|Gender_Index|Label|
+-------------------+---+--------+----+------------+--------------+---------------+---------+----------------+----------------+------------+-------------------+--------------+---------+----------+------+----+-------+------------+-----+
|2013-11-26 00:00:00| 26|November|2013|          19|   Youth (<25)|              M|   Canada|British Columbia|     Accessories|  Bike Racks|Hitch Rack - 4-Bike|             8|       45|       120|   590| 360|    950|         0.0|  0.0|
|2015-11-26 00:00:00| 26|November|2015|          19|   Y

In [6]:
featureAssembler = VectorAssembler(inputCols=['Unit_Price', 'Profit','Cost','Revenue','Gender_Index'], outputCol='Features')
df2 = featureAssembler.transform(df)

In [7]:
df3 = df2.select(['Features','Label'])
df3.show(13)

+--------------------+-----+
|            Features|Label|
+--------------------+-----+
|[120.0,590.0,360....|  0.0|
|[120.0,590.0,360....|  0.0|
|[120.0,1366.0,103...|  0.0|
|[120.0,1188.0,900...|  0.0|
|[120.0,238.0,180....|  0.0|
|[120.0,297.0,225....|  0.0|
|[120.0,199.0,180....|  0.0|
|[120.0,100.0,90.0...|  0.0|
|[120.0,1096.0,990...|  0.0|
|[120.0,1046.0,945...|  0.0|
|[120.0,398.0,360....|  0.0|
|[120.0,398.0,360....|  0.0|
|[120.0,349.0,315....|  0.0|
+--------------------+-----+
only showing top 13 rows



In [8]:
scaler = StandardScaler(inputCol='Features', outputCol='Scaler')
scaledModel = scaler.fit(df3).transform(df3)
scaledModel.show()

+--------------------+-----+--------------------+
|            Features|Label|              Scaler|
+--------------------+-----+--------------------+
|[120.0,590.0,360....|  0.0|[0.13014179109459...|
|[120.0,590.0,360....|  0.0|[0.13014179109459...|
|[120.0,1366.0,103...|  0.0|[0.13014179109459...|
|[120.0,1188.0,900...|  0.0|[0.13014179109459...|
|[120.0,238.0,180....|  0.0|[0.13014179109459...|
|[120.0,297.0,225....|  0.0|[0.13014179109459...|
|[120.0,199.0,180....|  0.0|[0.13014179109459...|
|[120.0,100.0,90.0...|  0.0|[0.13014179109459...|
|[120.0,1096.0,990...|  0.0|[0.13014179109459...|
|[120.0,1046.0,945...|  0.0|[0.13014179109459...|
|[120.0,398.0,360....|  0.0|[0.13014179109459...|
|[120.0,398.0,360....|  0.0|[0.13014179109459...|
|[120.0,349.0,315....|  0.0|[0.13014179109459...|
|[120.0,349.0,315....|  0.0|[0.13014179109459...|
|[120.0,369.0,225....|  0.0|[0.13014179109459...|
|[120.0,517.0,315....|  0.0|[0.13014179109459...|
|[120.0,148.0,90.0...|  0.0|[0.13014179109459...|


In [9]:
train_data, test_data = scaledModel.randomSplit([.75, .25], seed=13)

In [10]:
train_data.groupBy('Label').agg(count('Scaler')).show()

+-----+-------------+
|Label|count(Scaler)|
+-----+-------------+
|  0.0|        52710|
|  2.0|        12802|
|  1.0|        19398|
+-----+-------------+



In [11]:
dt = DecisionTreeClassifier(featuresCol = 'Scaler', labelCol = 'Label', maxDepth = 4)
dtModel = dt.fit(train_data)

In [12]:
predDt = dtModel.transform(test_data)
predDt.printSchema()

root
 |-- Features: vector (nullable = true)
 |-- Label: double (nullable = false)
 |-- Scaler: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [13]:
selectedDt =predDt.select("Label", "prediction", "probability")
selectedDt.show(10)

+-----+----------+--------------------+
|Label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.99992937853107...|
|  0.0|       0.0|[0.99992937853107...|
|  0.0|       0.0|[0.99992937853107...|
|  0.0|       0.0|[0.99992937853107...|
|  0.0|       0.0|[0.99992937853107...|
|  0.0|       0.0|[0.99992937853107...|
|  0.0|       0.0|[0.99992937853107...|
|  0.0|       0.0|[0.99992937853107...|
|  0.0|       0.0|[0.99992937853107...|
|  0.0|       0.0|[0.99992937853107...|
+-----+----------+--------------------+
only showing top 10 rows



In [14]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='Label')
evaluator.evaluate(predDt)

0.9447604763746957

### Random Forest

In [15]:
rf = RandomForestClassifier(featuresCol = 'Scaler', labelCol = 'Label')
rfModel = rf.fit(train_data)

In [16]:
predRf = rfModel.transform(test_data)
predRf.select("Label", "prediction", "probability").show(5)

+-----+----------+--------------------+
|Label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.98034238974106...|
|  0.0|       0.0|[0.98034238974106...|
|  0.0|       0.0|[0.98034238974106...|
|  0.0|       0.0|[0.98034238974106...|
|  0.0|       0.0|[0.98034238974106...|
+-----+----------+--------------------+
only showing top 5 rows



In [43]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol='Label', metricName='f1')
evaluator.evaluate(predRf)

0.952560310860556