In [2]:
/*
     Step #1 :  Import Libraries
*/
import org.joda.time.{DateTime, DateTimeZone}
println("Import libraries started by = " +DateTime.now(DateTimeZone.UTC))
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.col
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
println("Import libraries ended by = " + DateTime.now(DateTimeZone.UTC))



Import libraries started by = 2021-05-11T17:44:33.349Z
Import libraries ended by = 2021-05-11T17:44:33.349Z


import org.joda.time.{DateTime, DateTimeZone}
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.col
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}


In [3]:
/*
  Step #2 :  Load Training Data
*/
println("Data load started by = " + DateTime.now(DateTimeZone.UTC))
val vAR_df_data = spark.read.option("header",true).csv("gs://dssparkbucket/DS.AI_SalesData.csv")
println("Data load ended by = " + DateTime.now(DateTimeZone.UTC))
//check for partitions
println("Default partitions applied by system = " + vAR_df_data.rdd.partitions.length)


Data load started by = 2021-05-11T17:44:34.445Z
Data load ended by = 2021-05-11T17:44:46.521Z
Default partitions applied by system = 1


vAR_df_data: org.apache.spark.sql.DataFrame = [TransactionID: string, ProductID: string ... 11 more fields]


In [4]:
/*
  Step #3 : transform data for training
*/
println("Data transform started by: " + DateTime.now(DateTimeZone.UTC))
val vAR_df = vAR_df_data.withColumn("TransactionID",col("TransactionID").cast(IntegerType))
        .withColumn("ProductID",col("ProductID").cast(IntegerType))
        .withColumn("Quantity",col("Quantity").cast(IntegerType))
        .withColumn("ActualCost",col("ActualCost").cast(IntegerType))
        .withColumn("CustomerID",col("CustomerID").cast(IntegerType))
        .withColumn("TotalDue",col("TotalDue").cast(IntegerType))
        .withColumn("LineTotal",col("LineTotal").cast(IntegerType))
        .withColumn("MakeFlag",col("MakeFlag").cast(IntegerType))
        .withColumn("FinishedGoodsFlag",col("FinishedGoodsFlag").cast(IntegerType))
        .withColumn("SalesReasonID",col("SalesReasonID").cast(IntegerType))
        .withColumn("AverageRate",col("AverageRate").cast(IntegerType))
        .withColumn("EndOfDayRate",col("EndOfDayRate").cast(IntegerType))
        .withColumn("SalesLastYear",col("SalesLastYear").cast(IntegerType))     
 /*
   VectorAssembler is a transformer as it takes the input dataframe and returns the transformed dataframe
   with a new column which is vector representation of all the features.
   Select Features columns as "TransactionID", "Quantity", "ActualCost", "CustomerID", "TotalDue", "LineTotal", "FinishedGoodsFlag", "SalesReasonID", "AverageRate", "EndOfDayRate", "SalesLastYear", "ProductID"
 */
println("Vectorize the feature columns")
val vAR_assembler = new VectorAssembler()
  .setInputCols(Array("TransactionID","Quantity","ActualCost","CustomerID","TotalDue","LineTotal","FinishedGoodsFlag","SalesReasonID","AverageRate","EndOfDayRate","SalesLastYear","ProductID"))
  .setOutputCol("features")
val vAR_output = vAR_assembler.transform(vAR_df)
println("Data transform ended by: " + DateTime.now(DateTimeZone.UTC))

Data transform started by: 2021-05-11T17:44:47.955Z
Vectorize the feature columns
Data transform ended by: 2021-05-11T17:44:48.602Z


vAR_df: org.apache.spark.sql.DataFrame = [TransactionID: int, ProductID: int ... 11 more fields]
vAR_assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_16b6ca18473c, handleInvalid=error, numInputCols=12
vAR_output: org.apache.spark.sql.DataFrame = [TransactionID: int, ProductID: int ... 12 more fields]


In [5]:
/*
  Step #4 :  Select  Training,Test Data as 70:30
*/
println("Split the loaded data into training and test")
val Array(vAR_trainingData, vAR_testData) = vAR_output.randomSplit(Array(0.7, 0.3), seed = 1234L)

Split the loaded data into training and test


vAR_trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [TransactionID: int, ProductID: int ... 12 more fields]
vAR_testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [TransactionID: int, ProductID: int ... 12 more fields]


In [6]:
/*
  Step #5 :  Setup Pipeline
*/

println("Data prep started by = " + DateTime.now(DateTimeZone.UTC))

println("Index the feature column data")
val vAR_featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(13)
  .fit(vAR_output)

println("Instantiate the DecisionTreeRegressor algorithm")
val vAR_dectree = new DecisionTreeRegressor()
  .setLabelCol("MakeFlag")
  .setFeaturesCol("indexedFeatures")

println("Setup the Pipeline")
val vAR_pipeline = new Pipeline()
  .setStages(Array(vAR_featureIndexer, vAR_dectree))

println("Data prep ended by = " + DateTime.now(DateTimeZone.UTC))

Data prep started by = 2021-05-11T17:44:50.879Z
Index the feature column data
Instantiate the DecisionTreeRegressor algorithm
Setup the Pipeline
Data prep ended by = 2021-05-11T17:44:55.256Z


vAR_featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = VectorIndexerModel: uid=vecIdx_9a3504e6c8ee, numFeatures=12, handleInvalid=error
vAR_dectree: org.apache.spark.ml.regression.DecisionTreeRegressor = dtr_0d298550b640
vAR_pipeline: org.apache.spark.ml.Pipeline = pipeline_a2c28b01aff6


In [7]:
/*
  Step #6 :  Train , Test , Validate and Predict
*/
println("Train, Test , Validate and Predict started by = " + DateTime.now(DateTimeZone.UTC))

// Train model.
val vAR_model = vAR_pipeline.fit(vAR_trainingData)

// Make predictions.
val vAR_predictions = vAR_model.transform(vAR_testData)

// Select example rows to display.
vAR_predictions.select("prediction", "MakeFlag", "features").show(5)

// Select (prediction, true label) and compute test error.
val vAR_evaluator = new RegressionEvaluator()
  .setLabelCol("MakeFlag")
  .setPredictionCol("prediction")
  .setMetricName("rmse")

val vAR_rmse = vAR_evaluator.evaluate(vAR_predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $vAR_rmse")

val vAR_treeModel = vAR_model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
println(s"Learned regression tree model:\n ${vAR_treeModel.toDebugString}")

println("Train, Test , Validate and Predict ended by = " + DateTime.now(DateTimeZone.UTC))

Train, Test , Validate and Predict started by = 2021-05-11T17:44:56.143Z
+----------+--------+--------------------+
|prediction|MakeFlag|            features|
+----------+--------+--------------------+
|       0.0|       0|[103860.0,1.0,7.0...|
|       0.0|       0|[103860.0,1.0,7.0...|
|       0.0|       0|[103862.0,1.0,32....|
|       0.0|       0|[103870.0,1.0,24....|
|       0.0|       0|[103871.0,1.0,7.0...|
+----------+--------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 0.0
Learned regression tree model:
 DecisionTreeRegressionModel: uid=dtr_0d298550b640, depth=0, numNodes=1, numFeatures=12
  Predict: 0.0

Train, Test , Validate and Predict ended by = 2021-05-11T17:45:01.969Z


vAR_model: org.apache.spark.ml.PipelineModel = pipeline_a2c28b01aff6
vAR_predictions: org.apache.spark.sql.DataFrame = [TransactionID: int, ProductID: int ... 14 more fields]
vAR_evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_a596eb160f01, metricName=rmse, throughOrigin=false
vAR_rmse: Double = 0.0
vAR_treeModel: org.apache.spark.ml.regression.DecisionTreeRegressionModel = DecisionTreeRegressionModel: uid=dtr_0d298550b640, depth=0, numNodes=1, numFeatures=12


In [8]:
/*
 Step #7 : Apply Repartition count
*/
println("Partition Applied default by System  = " + vAR_df.rdd.getNumPartitions)
println("Repartition started by = " + DateTime.now(DateTimeZone.UTC))
//Apply Repartition to compare Data Parallelism performance with default partition
val vAR_repart_count_set = 4
println("Repartition count set as = " + vAR_repart_count_set)
val vAR_repart_df = vAR_df.repartition(vAR_repart_count_set)
println(vAR_repart_df.rdd.partitions.length)
println("Repartition ended by = " + DateTime.now(DateTimeZone.UTC))

Partition Applied default by System  = 1
Repartition started by = 2021-05-11T17:45:02.734Z
Repartition count set as = 4
4
Repartition ended by = 2021-05-11T17:45:03.285Z


vAR_repart_count_set: Int = 4
vAR_repart_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [TransactionID: int, ProductID: int ... 11 more fields]


In [9]:
println("After partition Vectorize the feature columns")
val vAR_repart_assembler = new VectorAssembler()
  .setInputCols(Array("TransactionID","Quantity","ActualCost","CustomerID","TotalDue","LineTotal","FinishedGoodsFlag","SalesReasonID","AverageRate","EndOfDayRate","SalesLastYear","ProductID"))
  .setOutputCol("features")
val vAR_repart_output = vAR_repart_assembler.transform(vAR_repart_df)

After partition Vectorize the feature columns


vAR_repart_assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_63d960061d2f, handleInvalid=error, numInputCols=12
vAR_repart_output: org.apache.spark.sql.DataFrame = [TransactionID: int, ProductID: int ... 12 more fields]


In [10]:
println("After partition Split the loaded data into training and test")
val Array(vAR_repart_training, vAR_repart_test) = vAR_repart_output.randomSplit(Array(0.7, 0.3), seed = 1234L)

After partition Split the loaded data into training and test


vAR_repart_training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [TransactionID: int, ProductID: int ... 12 more fields]
vAR_repart_test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [TransactionID: int, ProductID: int ... 12 more fields]


In [11]:
println("After partition Data prep started by = " +  DateTime.now(DateTimeZone.UTC))

println("After partition Index the feature column data")
val vAR_repart_featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(13)
  .fit(vAR_repart_output)

println("After partition Instantiate the DecisionTreeRegressor algorithm")
val vAR_repart_dectree = new DecisionTreeRegressor()
  .setLabelCol("MakeFlag")
  .setFeaturesCol("indexedFeatures")

println("After partition Setup the Pipeline")
// Chain indexer and tree in a Pipeline.
val vAR_repart_pipeline = new Pipeline()
  .setStages(Array(vAR_repart_featureIndexer, vAR_repart_dectree))

println("After partition Data prep ended by = " + DateTime.now(DateTimeZone.UTC))

After partition Data prep started by = 2021-05-11T17:45:05.244Z
After partition Index the feature column data
After partition Instantiate the DecisionTreeRegressor algorithm
After partition Setup the Pipeline
After partition Data prep ended by = 2021-05-11T17:45:06.216Z


vAR_repart_featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = VectorIndexerModel: uid=vecIdx_42e83d54d5c4, numFeatures=12, handleInvalid=error
vAR_repart_dectree: org.apache.spark.ml.regression.DecisionTreeRegressor = dtr_b2df5a0ea5c6
vAR_repart_pipeline: org.apache.spark.ml.Pipeline = pipeline_7fc82dd7ef6d


In [12]:
println("After partition Train, Test after partition started by = " + DateTime.now(DateTimeZone.UTC))

// Train model. 
val vAR_repart_model = vAR_repart_pipeline.fit(vAR_repart_training)

// Make predictions.
val vAR_repart_predictions = vAR_repart_model.transform(vAR_repart_test)

println("After partition Train, Test ended by = " +DateTime.now(DateTimeZone.UTC))

After partition Train, Test after partition started by = 2021-05-11T17:45:06.785Z
After partition Train, Test ended by = 2021-05-11T17:45:08.797Z


vAR_repart_model: org.apache.spark.ml.PipelineModel = pipeline_7fc82dd7ef6d
vAR_repart_predictions: org.apache.spark.sql.DataFrame = [TransactionID: int, ProductID: int ... 14 more fields]


In [13]:
// Select example rows to display.
vAR_repart_predictions.select("prediction", "MakeFlag", "features").show(5)

// Select (prediction, true label) and compute test error.
val vAR_repart_evaluator = new RegressionEvaluator()
  .setLabelCol("MakeFlag")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val vAR_repart_rmse = vAR_repart_evaluator.evaluate(vAR_repart_predictions)
println(s"After partition Root Mean Squared Error (RMSE) on test data = $vAR_repart_rmse")

val vAR_repart_treeModel = vAR_repart_model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
println(s"After partition Learned regression tree model:\n ${vAR_repart_treeModel.toDebugString}")


+----------+--------+--------------------+
|prediction|MakeFlag|            features|
+----------+--------+--------------------+
|       0.0|       0|[103868.0,1.0,2.0...|
|       0.0|       0|[103868.0,1.0,2.0...|
|       0.0|       0|[103870.0,1.0,24....|
|       0.0|       0|[103868.0,1.0,2.0...|
|       0.0|       0|[103869.0,1.0,4.0...|
+----------+--------+--------------------+
only showing top 5 rows

After partition Root Mean Squared Error (RMSE) on test data = 0.0
After partition Learned regression tree model:
 DecisionTreeRegressionModel: uid=dtr_b2df5a0ea5c6, depth=0, numNodes=1, numFeatures=12
  Predict: 0.0



vAR_repart_evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_672beeb84bb9, metricName=rmse, throughOrigin=false
vAR_repart_rmse: Double = 0.0
vAR_repart_treeModel: org.apache.spark.ml.regression.DecisionTreeRegressionModel = DecisionTreeRegressionModel: uid=dtr_b2df5a0ea5c6, depth=0, numNodes=1, numFeatures=12
