## 1. Read in Raw Transactional Data 

#### This dataset is a log of every item a customer has purchased, with one row per item, including financial / profit information 

In [2]:
val transactions = spark.read.format("csv")
                        .option("header", "true")
                        .option("inferSchema", "true")
                        .load("/Users/alynch/Downloads/MOCK_DATA.csv")
transactions.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- order_number: string (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- item_id: string (nullable = true)
 |-- item colour: string (nullable = true)
 |-- item size: string (nullable = true)
 |-- flag_coupon_used: boolean (nullable = true)
 |-- item_full_price: double (nullable = true)
 |-- item_sale_price: double (nullable = true)
 |-- item_cost_price: double (nullable = true)
 |-- shipping_cost: double (nullable = true)
 |-- profit: double (nullable = true)
 |-- country: string (nullable = true)
 |-- gender: string (nullable = true)



transactions = [customer_id: string, order_number: string ... 12 more fields]


[customer_id: string, order_number: string ... 12 more fields]

In [3]:
transactions.show(5)

+--------------------+--------------------+-------------------+--------------------+-----------+---------+----------------+---------------+---------------+---------------+-------------+------+-------+------+
|         customer_id|        order_number|         order_date|             item_id|item colour|item size|flag_coupon_used|item_full_price|item_sale_price|item_cost_price|shipping_cost|profit|country|gender|
+--------------------+--------------------+-------------------+--------------------+-----------+---------+----------------+---------------+---------------+---------------+-------------+------+-------+------+
|1bd7c236ee84a9a1d...|2d76e544c70646e22...|2018-05-26 00:00:00|2521d69949def3e60...|     Violet|        L|           false|           41.7|          45.45|          -0.91|        -2.18|  0.84|     AR|     M|
|02c0b659d2110a255...|7bdad59db3a90d67a...|2018-04-24 00:00:00|7815849e9059cf629...|        Red|      2XL|            true|          31.38|          50.08|          -1.

In [4]:
import org.apache.spark.sql.functions._
transactions.select(min("order_date"), max("order_date")).show(truncate=false)

+-------------------+-------------------+
|min(order_date)    |max(order_date)    |
+-------------------+-------------------+
|2012-11-06 00:00:00|2018-10-30 00:00:00|
+-------------------+-------------------+



## 2. Feature Engineering & Label Generation

### - Aggregate the dataset into one row per order

In [5]:
import org.joda.time.{DateTime, DateTimeZone}
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}

def getOrderLevelAggregations(transactionData: Dataset[Row]) = {
    //aggregate the item-level transaction events into order level so that we could extract order_level features
    //such as average_spend in orders, etc.
    transactionData.groupBy("order_number").agg(
      first("order_date") as "order_date",
      first("customer_id") as "customer_id",
      sum("item_full_price") as "full_price_per_order",
      sum("item_sale_price") as "sale_price_per_order",
      sum("item_cost_price") as "item_cost_price_per_order",
      sum("profit") as "profit_per_order",
      max("flag_coupon_used") as "coupon_used_in_order",
      sum(when(col("flag_coupon_used") === true, col("item_full_price")).otherwise(0)) as "item_full_price_with_coupon_per_order",
      count("*") as "items_per_order"
    )
  }
val aggregatedOrderDF: DataFrame = getOrderLevelAggregations(transactions)
aggregatedOrderDF.printSchema()

root
 |-- order_number: string (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- full_price_per_order: double (nullable = true)
 |-- sale_price_per_order: double (nullable = true)
 |-- item_cost_price_per_order: double (nullable = true)
 |-- profit_per_order: double (nullable = true)
 |-- coupon_used_in_order: boolean (nullable = true)
 |-- item_full_price_with_coupon_per_order: double (nullable = true)
 |-- items_per_order: long (nullable = false)



aggregatedOrderDF = [order_number: string, order_date: timestamp ... 8 more fields]


getOrderLevelAggregations: (transactionData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])org.apache.spark.sql.DataFrame


[order_number: string, order_date: timestamp ... 8 more fields]

### - Split the feature and target time frames using a cut-off date 

In [33]:

val DateFormatterPattern: String = "yyyy-MM-dd"
val dateFormatter: DateTimeFormatter = DateTimeFormat.forPattern(DateFormatterPattern)

val cutOffDateString = "2017-11-01"
val cutOffDate = DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.UTC).parseDateTime(cutOffDateString).withTimeAtStartOfDay()

val featureTimeFrameFilter: Column = col("order_date").lt(dateFormatter.print(cutOffDate))
val labelTimeFrameFilter: Column = col("order_date").lt(dateFormatter.print(cutOffDate.plusMonths(12))) 
          .and(col("order_date").geq(dateFormatter.print(cutOffDate)))

DateFormatterPattern = yyyy-MM-dd
dateFormatter = org.joda.time.format.DateTimeFormatter@1edd9769
cutOffDateString = 2017-11-01
cutOffDate = 2017-11-01T00:00:00.000Z
featureTimeFrameFilter = (order_date < 2017-11-01)
labelTimeFrameFilter = ((order_date < 2018-11-01) AND (order_date >= 2017-11-01))


((order_date < 2018-11-01) AND (order_date >= 2017-11-01))

### - And then aggregate into one row per customer, aggregating features over many time frames
### - Make feature engineering easy by automatically generating filters & aggregations in a functional way

In [44]:
// Many time frames 

def getFilterByMonths(cutOffDate: DateTime, numOfMonths: Int, dateFormatter: DateTimeFormatter) = {
    val nMonthsBeforeFilter: Column = 
    col("order_date").lt(dateFormatter.print(cutOffDate))
    .and(col("order_date").geq(dateFormatter.print(cutOffDate.minusMonths(numOfMonths))))
    nMonthsBeforeFilter
  }

def getTimeFilters(cutOffDate: DateTime, dateFormatter: DateTimeFormatter) = {
    val filters = List(
      (getFilterByMonths(cutOffDate, 1, dateFormatter), "1M"),
      (getFilterByMonths(cutOffDate, 3, dateFormatter), "3M"),
      (getFilterByMonths(cutOffDate, 6, dateFormatter), "6M"),
      (getFilterByMonths(cutOffDate, 12, dateFormatter), "12M")
    )
    filters
  }

// with the time filter, make various aggregations
def getTimePeriodAggregations(timeFilter: Column, labelSuffix: String) = {
    //Note that these aggregations are applied on the transaction data after Order Level Aggregations
    List(
      sum(when(timeFilter, col("profit_per_order")).otherwise(0)) as s"SumProfit$labelSuffix", 
      avg(when(timeFilter, col("profit_per_order")).otherwise(0)) as  s"AvgProfit$labelSuffix", 
      sum(when(timeFilter, col("full_price_per_order")).otherwise(0)) as s"TotalSpendBeforeDiscount$labelSuffix", 
      sum(when(timeFilter, col("sale_price_per_order")).otherwise(0)) as s"TotalSpendBeforeCoupon$labelSuffix", 
      count(when(timeFilter, col("order_number"))) as s"NumberOrders$labelSuffix", 
      sum(when(timeFilter, col("items_per_order")).otherwise(0)) as s"NumberItems$labelSuffix", 
      avg(when(timeFilter, col("items_per_order")).otherwise(0)) as s"AvgNumberItems$labelSuffix", 
      sum(when(timeFilter, col("item_full_price_with_coupon_per_order")).otherwise(0)) as s"FullPriceWithCoupon$labelSuffix", 
      sum(when(timeFilter && col("coupon_used_in_order"), 1).otherwise(0)) as s"NumberOrdersWithCoupon$labelSuffix" 
    )
  }


getFilterByMonths: (cutOffDate: org.joda.time.DateTime, numOfMonths: Int, dateFormatter: org.joda.time.format.DateTimeFormatter)org.apache.spark.sql.Column
getTimeFilters: (cutOffDate: org.joda.time.DateTime, dateFormatter: org.joda.time.format.DateTimeFormatter)List[(org.apache.spark.sql.Column, String)]
getTimePeriodAggregations: (timeFilter: org.apache.spark.sql.Column, labelSuffix: String)List[org.apache.spark.sql.Column]


### Apply large list of filters in a functional way 

In [35]:
val filters: List[(Column, String)] = getTimeFilters(cutOffDate, dateFormatter)
val filterBasedAggregations = filters.map(filterWithLabel => 
                                          getTimePeriodAggregations(filterWithLabel._1, filterWithLabel._2)).flatten

val aggregationColumns = filterBasedAggregations ++ List(
      //the label value to predict for the next 12 months
      sum(when(labelTimeFrameFilter, col("profit_per_order")).otherwise(0)) as "sum_profit")


filterBasedAggregations = List(sum(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN profit_per_order ELSE 0 END) AS `SumProfit1M`, avg(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN profit_per_order ELSE 0 END) AS `AvgProfit1M`, sum(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN full_price_per_order ELSE 0 END) AS `TotalSpendBeforeDiscount1M`, sum(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN sale_price_per_order ELSE 0 END) AS `TotalSpendBeforeCoupon1M`, count(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN 1 END) AS `NumberOrders1M`, sum(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN items_per...


List(sum(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN profit_per_order ELSE 0 END) AS `SumProfit1M`, avg(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN profit_per_order ELSE 0 END) AS `AvgProfit1M`, sum(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN full_price_per_order ELSE 0 END) AS `TotalSpendBeforeDiscount1M`, sum(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN sale_price_per_order ELSE 0 END) AS `TotalSpendBeforeCoupon1M`, count(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN 1 END) AS `NumberOrders1M`, sum(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN items_per_order ELSE 0 END) AS `NumberItems1M`, avg(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN items_per_order ELSE 0 END) AS `AvgNumberItems1M`, sum(CASE WHEN ((order_date < 2017-11-01) AND (order_date >= 2017-10-01)) THEN item_full_price_wi

In [37]:
val featuresAndLabels = aggregatedOrderDF.groupBy("customer_id").agg(aggregationColumns.head, aggregationColumns.tail: _*)

featuresAndLabels = [customer_id: string, SumProfit1M: double ... 36 more fields]


[customer_id: string, SumProfit1M: double ... 36 more fields]

In [38]:
featuresAndLabels.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- SumProfit1M: double (nullable = true)
 |-- AvgProfit1M: double (nullable = true)
 |-- TotalSpendBeforeDiscount1M: double (nullable = true)
 |-- TotalSpendBeforeCoupon1M: double (nullable = true)
 |-- NumberOrders1M: long (nullable = false)
 |-- NumberItems1M: long (nullable = true)
 |-- AvgNumberItems1M: double (nullable = true)
 |-- FullPriceWithCoupon1M: double (nullable = true)
 |-- NumberOrdersWithCoupon1M: long (nullable = true)
 |-- SumProfit3M: double (nullable = true)
 |-- AvgProfit3M: double (nullable = true)
 |-- TotalSpendBeforeDiscount3M: double (nullable = true)
 |-- TotalSpendBeforeCoupon3M: double (nullable = true)
 |-- NumberOrders3M: long (nullable = false)
 |-- NumberItems3M: long (nullable = true)
 |-- AvgNumberItems3M: double (nullable = true)
 |-- FullPriceWithCoupon3M: double (nullable = true)
 |-- NumberOrdersWithCoupon3M: long (nullable = true)
 |-- SumProfit6M: double (nullable = true)
 |-- AvgProfit6M: doubl

In [39]:
featuresAndLabels.show(5)

|         customer_id|SumProfit1M|AvgProfit1M|TotalSpendBeforeDiscount1M|TotalSpendBeforeCoupon1M|NumberOrders1M|NumberItems1M|AvgNumberItems1M|FullPriceWithCoupon1M|NumberOrdersWithCoupon1M|SumProfit3M|AvgProfit3M|TotalSpendBeforeDiscount3M|TotalSpendBeforeCoupon3M|NumberOrders3M|NumberItems3M|AvgNumberItems3M|FullPriceWithCoupon3M|NumberOrdersWithCoupon3M|SumProfit6M|AvgProfit6M|TotalSpendBeforeDiscount6M|TotalSpendBeforeCoupon6M|NumberOrders6M|NumberItems6M|AvgNumberItems6M|FullPriceWithCoupon6M|NumberOrdersWithCoupon6M|SumProfit12M|AvgProfit12M|TotalSpendBeforeDiscount12M|TotalSpendBeforeCoupon12M|NumberOrders12M|NumberItems12M|AvgNumberItems12M|FullPriceWithCoupon12M|NumberOrdersWithCoupon12M|sum_profit|
+--------------------+-----------+-----------+--------------------------+------------------------+--------------+-------------+----------------+---------------------+------------------------+-----------+-----------+--------------------------+------------------------+--------------

## 3. Modelling 

In [45]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{Binarizer, StandardScaler, VectorAssembler}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.ml.evaluation.RegressionEvaluator

In [46]:
val labelName = "sum_profit"
val identifierName = "customer_id"
val featureList = featuresAndLabels.drop(labelName,identifierName, "label").columns

labelName = sum_profit
identifierName = customer_id
featureList = Array(SumProfit1M, AvgProfit1M, TotalSpendBeforeDiscount1M, TotalSpendBeforeCoupon1M, NumberOrders1M, NumberItems1M, AvgNumberItems1M, FullPriceWithCoupon1M, NumberOrdersWithCoupon1M, SumProfit3M, AvgProfit3M, TotalSpendBeforeDiscount3M, TotalSpendBeforeCoupon3M, NumberOrders3M, NumberItems3M, AvgNumberItems3M, FullPriceWithCoupon3M, NumberOrdersWithCoupon3M, SumProfit6M, AvgProfit6M, TotalSpendBeforeDiscount6M, TotalSpendBeforeCoupon6M, NumberOrders6M, NumberItems6M, AvgNumberItems6M, FullPriceWithCoupon6M, NumberOrdersWithCoupon6M, SumProfit12M, AvgProfit12M, TotalSpendBeforeDiscount12M, TotalSpendBeforeCoupon12M, NumberOrders12M, NumberItems12M, AvgNumberItems12M, FullPriceWithCoupon12M, ...


[SumProfit1M, AvgProfit1M, TotalSpendBeforeDiscount1M, TotalSpendBeforeCoupon1M, NumberOrders1M, NumberItems1M, AvgNumberItems1M, FullPriceWithCoupon1M, NumberOrdersWithCoupon1M, SumProfit3M, AvgProfit3M, TotalSpendBeforeDiscount3M, TotalSpendBeforeCoupon3M, NumberOrders3M, NumberItems3M, AvgNumberItems3M, FullPriceWithCoupon3M, NumberOrdersWithCoupon3M, SumProfit6M, AvgProfit6M, TotalSpendBeforeDiscount6M, TotalSpendBeforeCoupon6M, NumberOrders6M, NumberItems6M, AvgNumberItems6M, FullPriceWithCoupon6M, NumberOrdersWithCoupon6M, SumProfit12M, AvgProfit12M, TotalSpendBeforeDiscount12M, TotalSpendBeforeCoupon12M, NumberOrders12M, NumberItems12M, AvgNumberItems12M, FullPriceWithCoupon12M, NumberOrdersWithCoupon12M]

### Keep aside a random selection of data for testing / evaluation

In [12]:
val trainAndTestData = featuresAndLabels
                        .withColumn("label", col(labelName))
                        .randomSplit(Array(0.8, 0.2))

val (trainingData, testingData) = (trainAndTestData.apply(0), trainAndTestData.apply(1))

trainAndTestData = Array([customer_id: string, SumProfit1M: double ... 37 more fields], [customer_id: string, SumProfit1M: double ... 37 more fields])
trainingData = [customer_id: string, SumProfit1M: double ... 37 more fields]
testingData = [customer_id: string, SumProfit1M: double ... 37 more fields]


[customer_id: string, SumProfit1M: double ... 37 more fields]

### More Feature Grunt Work ... 
https://spark.apache.org/docs/latest/ml-features.html

In [13]:
val assembler = new VectorAssembler()
                    .setInputCols(featureList)
                    .setOutputCol("features")

assembler = vecAssembler_5d110a930870


vecAssembler_5d110a930870

In [14]:
val scaler = new StandardScaler()
                .setInputCol("features")
                .setOutputCol("scaledFeatures")
                .setWithStd(true)
                .setWithMean(true)

scaler = stdScal_00bb4d6c59a7


stdScal_00bb4d6c59a7

### Train a Regression Model 
### Search for best parameter values with grid search via cross validation

In [48]:
val lr = new LinearRegression()
      .setFitIntercept(true)
      .setFeaturesCol("scaledFeatures")
      .setTol(1e-6)

//val rf = new RandomForestRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures")

val paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.1, 0.01))
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
  .build()

//val paramGrid = new ParamGridBuilder().addGrid(rf.numTrees, Array(100.0,500.0,1000.0)).addGrid(rf.maxDepth(10.0,50.0)).build()

val pipeline = new Pipeline().setStages(Array(assembler, scaler, lr))

val evaluator = new RegressionEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("rmse")

val cv = new CrossValidator()
      .setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(3)

lr = linReg_eb7370b1f1db
paramGrid = 
pipeline = pipeline_4eecc08ecc63
evaluator = regEval_12dea5d6496f


Array({
	linReg_eb7370b1f1db-elasticNetParam: 0.0,
	linReg_eb7370b1f1db-regParam: 0.1
}, {
	linReg_eb7370b1f1db-elasticNetParam: 0.5,
	linReg_eb7370b1f1db-regParam: 0.1
}, {
	linReg_eb7370b1f1db-elasticNetParam: 1.0,
	linReg_eb7370b1f1db-regParam: 0.1
}, {
	linReg_eb7370b1f1db-elasticNetParam: 0.0,
	linReg_eb7370b1f1db-regParam: 0.01
}, {
	linReg_eb7370b1f1db-elasticNetParam: 0.5,
	linReg_eb7370b1f1db-regParam: 0.01
}, {
	linReg_eb7370b1f1db-elasticNetParam: 1.0,
	linReg_eb7370b1f1db-regParam: 0.01
})
cv: org.apach...


regEval_12dea5d6496f

In [16]:
cv.fit(trainingData)



cv_5090eb6e4f0a

In [49]:
val cvModel: CrossValidatorModel = cv.fit(trainingData.sample(true, 0.1))



cvModel = cv_bbfccf5418af


cv_bbfccf5418af

In [50]:
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]

bestModel = pipeline_4eecc08ecc63


pipeline_4eecc08ecc63

## Evaluate Model with Testing Data

In [51]:
val testingDataWithPredictions = bestModel.transform(testingData)

testingDataWithPredictions = [customer_id: string, SumProfit1M: double ... 40 more fields]


[customer_id: string, SumProfit1M: double ... 40 more fields]

In [24]:
val evaluator = new RegressionEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("rmse")

val rmse = evaluator.evaluate(testingDataWithPredictions)



rmse = 0.39791193197135727


0.39791193197135727

In [52]:
val lrModel = bestModel.stages(2).asInstanceOf[LinearRegressionModel]

lrModel = linReg_eb7370b1f1db


linReg_eb7370b1f1db

### Check the model convergence & loss function 
### Performance metrics on Training Dataset

In [54]:
val trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show(10)
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")
//trainingSummary.pValues  // available when WLS optimization used 

numIterations: 28
objectiveHistory: [0.49999999999999994,0.4983477426092327,0.49722523322619094,0.4970933541258635,0.4969322085177869,0.49691139432880715,0.49690856778646847,0.49688714675797224,0.4968804459065397,0.4968718311341217,0.4968710302411861,0.49687038198763855,0.4968695091093931,0.49686902451599535,0.4968683788670904,0.4968674846913278,0.4968671319333908,0.49686692914038916,0.4968667446210605,0.4968660546020295,0.49686561061699447,0.4968656097729217,0.4968656095005232,0.49686560939177343,0.49686560934688173,0.49686560933816515,0.4968656093367706,0.496865609336713]
|           residuals|
+--------------------+
|-0.09435569679100243|
|-0.09435569679100243|
|-0.09435569679100243|
|-0.09435569679100243|
|-0.09435569679100243|
|-0.09435569679100243|
|  1.4256443032089976|
|-0.09435569679100243|
|-0.09435569679100243|
|-0.09435569679100243|
+--------------------+
only showing top 10 rows

RMSE: 0.31017642178130067
r2: 0.008836665722128823


trainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@580abccd


org.apache.spark.ml.regression.LinearRegressionTrainingSummary@580abccd

In [55]:
val coefficients = lrModel.coefficients

coefficients = [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.00824990164504512,-0.008249901645045094,-0.0082499016450451,0.0,0.0]


[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.00824990164504512,-0.008249901645045094,-0.0082499016450451,0.0,0.0]

In [56]:
val intercept = lrModel.intercept

intercept = 0.08583333333333332


0.08583333333333332

In [57]:
val features = trainingData.drop("sum_profit", "label", "customer_id").columns ++  Array("intercept")

features = Array(SumProfit1M, AvgProfit1M, TotalSpendBeforeDiscount1M, TotalSpendBeforeCoupon1M, NumberOrders1M, NumberItems1M, AvgNumberItems1M, FullPriceWithCoupon1M, NumberOrdersWithCoupon1M, SumProfit3M, AvgProfit3M, TotalSpendBeforeDiscount3M, TotalSpendBeforeCoupon3M, NumberOrders3M, NumberItems3M, AvgNumberItems3M, FullPriceWithCoupon3M, NumberOrdersWithCoupon3M, SumProfit6M, AvgProfit6M, TotalSpendBeforeDiscount6M, TotalSpendBeforeCoupon6M, NumberOrders6M, NumberItems6M, AvgNumberItems6M, FullPriceWithCoupon6M, NumberOrdersWithCoupon6M, SumProfit12M, AvgProfit12M, TotalSpendBeforeDiscount12M, TotalSpendBeforeCoupon12M, NumberOrders12M, NumberItems12M, AvgNumberItems12M, FullPriceWithCoupon12M, NumberOrdersWithCoupon12M, intercept)


[SumProfit1M, AvgProfit1M, TotalSpendBeforeDiscount1M, TotalSpendBeforeCoupon1M, NumberOrders1M, NumberItems1M, AvgNumberItems1M, FullPriceWithCoupon1M, NumberOrdersWithCoupon1M, SumProfit3M, AvgProfit3M, TotalSpendBeforeDiscount3M, TotalSpendBeforeCoupon3M, NumberOrders3M, NumberItems3M, AvgNumberItems3M, FullPriceWithCoupon3M, NumberOrdersWithCoupon3M, SumProfit6M, AvgProfit6M, TotalSpendBeforeDiscount6M, TotalSpendBeforeCoupon6M, NumberOrders6M, NumberItems6M, AvgNumberItems6M, FullPriceWithCoupon6M, NumberOrdersWithCoupon6M, SumProfit12M, AvgProfit12M, TotalSpendBeforeDiscount12M, TotalSpendBeforeCoupon12M, NumberOrders12M, NumberItems12M, AvgNumberItems12M, FullPriceWithCoupon12M, NumberOrdersWithCoupon12M, intercept]

In [58]:
import org.apache.spark.ml.linalg.DenseVector
val coefficientsAndIntercept = coefficients.asInstanceOf[DenseVector].toArray ++ Array(intercept)

coefficientsAndIntercept = Array(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, -0.00824990164504512, -0.008249901645045094, -0.0082499016450451, 0.0, 0.0, 0.08583333333333332)


[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, -0.00824990164504512, -0.008249901645045094, -0.0082499016450451, 0.0, 0.0, 0.08583333333333332]

In [61]:
import scala.math
val featuresAndCoefficients  = features zip coefficientsAndIntercept 

featuresAndCoefficients = Array((SumProfit1M,0.0), (AvgProfit1M,0.0), (TotalSpendBeforeDiscount1M,0.0), (TotalSpendBeforeCoupon1M,0.0), (NumberOrders1M,0.0), (NumberItems1M,0.0), (AvgNumberItems1M,0.0), (FullPriceWithCoupon1M,0.0), (NumberOrdersWithCoupon1M,0.0), (SumProfit3M,0.0), (AvgProfit3M,0.0), (TotalSpendBeforeDiscount3M,0.0), (TotalSpendBeforeCoupon3M,0.0), (NumberOrders3M,0.0), (NumberItems3M,0.0), (AvgNumberItems3M,0.0), (FullPriceWithCoupon3M,0.0), (NumberOrdersWithCoupon3M,0.0), (SumProfit6M,0.0), (AvgProfit6M,0.0), (TotalSpendBeforeDiscount6M,0.0), (TotalSpendBeforeCoupon6M,0.0), (NumberOrders6M,0.0), (NumberItems6M,0.0), (AvgNumberItems6M,0.0), (FullPriceWithCoupon6M,0.0), (NumberOrdersWithCoupon6M,0.0), (SumProfit12M,0.0), (AvgPr...


[(SumProfit1M,0.0), (AvgProfit1M,0.0), (TotalSpendBeforeDiscount1M,0.0), (TotalSpendBeforeCoupon1M,0.0), (NumberOrders1M,0.0), (NumberItems1M,0.0), (AvgNumberItems1M,0.0), (FullPriceWithCoupon1M,0.0), (NumberOrdersWithCoupon1M,0.0), (SumProfit3M,0.0), (AvgProfit3M,0.0), (TotalSpendBeforeDiscount3M,0.0), (TotalSpendBeforeCoupon3M,0.0), (NumberOrders3M,0.0), (NumberItems3M,0.0), (AvgNumberItems3M,0.0), (FullPriceWithCoupon3M,0.0), (NumberOrdersWithCoupon3M,0.0), (SumProfit6M,0.0), (AvgProfit6M,0.0), (TotalSpendBeforeDiscount6M,0.0), (TotalSpendBeforeCoupon6M,0.0), (NumberOrders6M,0.0), (NumberItems6M,0.0), (AvgNumberItems6M,0.0), (FullPriceWithCoupon6M,0.0), (NumberOrdersWithCoupon6M,0.0), (SumProfit12M,0.0), (AvgProfit12M,0.0), (TotalSpendBeforeDiscount12M,0.0), (TotalSpendBeforeCoupon12M,0.0), (NumberOrders12M,-0.00824990164504512), (NumberItems12M,-0.008249901645045094), (AvgNumberItems12M,-0.0082499016450451), (FullPriceWithCoupon12M,0.0), (NumberOrdersWithCoupon12M,0.0), (intercept,

In [67]:
featuresAndCoefficients.sortBy{x=> -math.abs(x._2)}.foreach{ x => println(s"${x._1} \t\t\t  ${x._2}") }

intercept 			  0.08583333333333332
NumberOrders12M 			  -0.00824990164504512
AvgNumberItems12M 			  -0.0082499016450451
NumberItems12M 			  -0.008249901645045094
SumProfit1M 			  0.0
AvgProfit1M 			  0.0
TotalSpendBeforeDiscount1M 			  0.0
TotalSpendBeforeCoupon1M 			  0.0
NumberOrders1M 			  0.0
NumberItems1M 			  0.0
AvgNumberItems1M 			  0.0
FullPriceWithCoupon1M 			  0.0
NumberOrdersWithCoupon1M 			  0.0
SumProfit3M 			  0.0
AvgProfit3M 			  0.0
TotalSpendBeforeDiscount3M 			  0.0
TotalSpendBeforeCoupon3M 			  0.0
NumberOrders3M 			  0.0
NumberItems3M 			  0.0
AvgNumberItems3M 			  0.0
FullPriceWithCoupon3M 			  0.0
NumberOrdersWithCoupon3M 			  0.0
SumProfit6M 			  0.0
AvgProfit6M 			  0.0
TotalSpendBeforeDiscount6M 			  0.0
TotalSpendBeforeCoupon6M 			  0.0
NumberOrders6M 			  0.0
NumberItems6M 			  0.0
AvgNumberItems6M 			  0.0
FullPriceWithCoupon6M 			  0.0
NumberOrdersWithCoupon6M 			  0.0
SumProfit12M 			  0.0
AvgProfit12M 			  0.0
TotalSpendBeforeDiscount12M 			  0.0
TotalSp

### Great for end to end Production Machine Learning Systems
### Great for big data 
### No so great for tinkering : limited options for data visualisation  (R, python better)
### Limited statistical packages (R, python better)