### Backorder Prediction for Retailer, Logistic Regression vs Gradient Boosted Trees vs Random Forests

### Training data file contains the historical data for the 8 weeks prior to the week we are trying to predict. The data was taken as weekly snapshots at the start of each week. Columns are defined as follows:

sku - Random ID for the product;
national_inv - Current inventory level for the part;
lead_time - Transit time for product (if available);
in_transit_qty - Amount of product in transit from source;
forecast_3_month - Forecast sales for the next 3 months;
forecast_6_month - Forecast sales for the next 6 months;
forecast_9_month - Forecast sales for the next 9 months;
sales_1_month - Sales quantity for the prior 1 month time period;
sales_3_month - Sales quantity for the prior 3 month time period;
sales_6_month - Sales quantity for the prior 6 month time period;
sales_9_month - Sales quantity for the prior 9 month time period;
min_bank - Minimum recommend amount to stock;
potential_issue - Source issue for part identified;
pieces_past_due - Parts overdue from source;
perf_6_month_avg - Source performance for prior 6 month period;
perf_12_month_avg - Source performance for prior 12 month period;
local_bo_qty - Amount of stock orders overdue;
deck_risk - Part risk flag;
oe_constraint - Part risk flag;
ppap_risk - Part risk flag;
stop_auto_buy - Part risk flag;
rev_stop - Part risk flag;
went_on_backorder - Product actually went on backorder;
This is the target value. national_inv - Current inventory level for the part

In [1]:
### Find Spark and set its location

import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7/')

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('backorder').getOrCreate()

In [4]:
from pyspark.ml.classification import LogisticRegression

In [5]:
train = spark.read.csv("Backorder_Dataset_small.csv", inferSchema = True, header = True)

In [6]:
train.printSchema()

root
 |-- sku: string (nullable = true)
 |-- national_inv: integer (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- in_transit_qty: integer (nullable = true)
 |-- forecast_3_month: integer (nullable = true)
 |-- forecast_6_month: integer (nullable = true)
 |-- forecast_9_month: integer (nullable = true)
 |-- sales_1_month: integer (nullable = true)
 |-- sales_3_month: integer (nullable = true)
 |-- sales_6_month: integer (nullable = true)
 |-- sales_9_month: integer (nullable = true)
 |-- min_bank: integer (nullable = true)
 |-- potential_issue: string (nullable = true)
 |-- pieces_past_due: integer (nullable = true)
 |-- perf_6_month_avg: double (nullable = true)
 |-- perf_12_month_avg: double (nullable = true)
 |-- local_bo_qty: integer (nullable = true)
 |-- deck_risk: string (nullable = true)
 |-- oe_constraint: string (nullable = true)
 |-- ppap_risk: string (nullable = true)
 |-- stop_auto_buy: string (nullable = true)
 |-- rev_stop: string (nullable = true)
 |-- 

In [6]:
### Set certain types of data as 'double'
### StringIndexer is to create quantitative data field out of 'went_on_backorder'

from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer

In [8]:
### Show 'null' values

train.groupBy('went_on_backorder').count().show()

+-----------------+------+
|went_on_backorder| count|
+-----------------+------+
|             null|     1|
|               No|239387|
|              Yes|  2688|
+-----------------+------+



In [7]:
### Drop the one record with 'null' value

train = train.dropna(subset = 'went_on_backorder')
train.groupBy('went_on_backorder').count().show()

+-----------------+------+
|went_on_backorder| count|
+-----------------+------+
|               No|239387|
|              Yes|  2688|
+-----------------+------+



In [8]:
### Create 'backorder' column that transforms qualitative field into quantitative

indexer = StringIndexer(inputCol="went_on_backorder", outputCol="backorder")

In [9]:
### Change data type for fields needed for the Logistic Regression model

train = train.withColumn("national_inv", train["national_inv"].cast(DoubleType()))
train = train.withColumn("lead_time", train["lead_time"].cast(DoubleType()))
train = train.withColumn("in_transit_qty", train["in_transit_qty"].cast(DoubleType()))
train = train.withColumn("min_bank", train["min_bank"].cast(DoubleType()))
train = train.withColumn("local_bo_qty", train["local_bo_qty"].cast(DoubleType()))

In [12]:
### 'Lead_time' has null values too.

train.groupBy('lead_time').count().show()

+---------+-----+
|lead_time|count|
+---------+-----+
|      8.0|98533|
|      0.0| 1515|
|      7.0|   30|
|     null|14724|
|     35.0|    5|
|     18.0|   32|
|      1.0|    3|
|     25.0|    1|
|      4.0|18508|
|     23.0|    2|
|     11.0|  152|
|     21.0|    7|
|     14.0| 1506|
|     22.0|   19|
|      3.0| 2349|
|     19.0|    4|
|     28.0|   12|
|      2.0|46917|
|     17.0|  537|
|     10.0| 2056|
+---------+-----+
only showing top 20 rows



In [10]:
### Calculate mean (average) value for 'lead_time' to replace 'null' value.

from pyspark.sql.functions import mean
meanValue = train.select(mean(train['lead_time']))

In [11]:
### The actual mean (average) value for filling in 'null' values
meanValue.collect()[0][0]
Value = meanValue.collect()[0][0]

In [12]:
### Use meanValue to fill in 'null' in "lead_time".
train = train.na.fill(Value, subset=['lead_time'])

In [13]:
### Now show what 'lead_time' looks like once 'null' has been set to average (mean) value.

train = train.withColumn("lead_time", train["lead_time"].cast(DoubleType()))
train.groupBy('lead_time').count().show()

+-----------------+-----+
|        lead_time|count|
+-----------------+-----+
|              8.0|98533|
|              0.0| 1515|
|              7.0|   30|
|             35.0|    5|
|             18.0|   32|
|7.923017712699746|14724|
|              1.0|    3|
|             25.0|    1|
|              4.0|18508|
|             23.0|    2|
|             11.0|  152|
|             21.0|    7|
|             14.0| 1506|
|             22.0|   19|
|              3.0| 2349|
|             19.0|    4|
|             28.0|   12|
|              2.0|46917|
|             17.0|  537|
|             10.0| 2056|
+-----------------+-----+
only showing top 20 rows



In [14]:
### Import Pearson Correlation to check relationship between features.

from pyspark.sql.functions import corr

In [18]:
### Feature relationships

train.select(corr('lead_time','national_inv')).show()
train.select(corr('in_transit_qty','lead_time')).show()
train.select(corr('min_bank','in_transit_qty')).show()
train.select(corr('lead_time','min_bank')).show()
train.select(corr('min_bank','local_bo_qty')).show()
train.select(corr('min_bank','perf_6_month_avg')).show()
train.select(corr('in_transit_qty','perf_12_month_avg')).show()
train.select(corr('in_transit_qty','sales_9_month')).show()
train.select(corr('sales_9_month','forecast_9_month')).show()
train.select(corr('sales_6_month','forecast_6_month')).show()
train.select(corr('sales_9_month','min_bank')).show()
train.select(corr('min_bank','pieces_past_due')).show()
train.select(corr('sales_9_month','local_bo_qty')).show()

+-----------------------------+
|corr(lead_time, national_inv)|
+-----------------------------+
|         0.003360543456619082|
+-----------------------------+

+-------------------------------+
|corr(in_transit_qty, lead_time)|
+-------------------------------+
|           -0.00807130850689...|
+-------------------------------+

+------------------------------+
|corr(min_bank, in_transit_qty)|
+------------------------------+
|            0.7229275709491002|
+------------------------------+

+-------------------------+
|corr(lead_time, min_bank)|
+-------------------------+
|     -0.00693979549451...|
+-------------------------+

+----------------------------+
|corr(min_bank, local_bo_qty)|
+----------------------------+
|        0.048200196504585985|
+----------------------------+

+--------------------------------+
|corr(min_bank, perf_6_month_avg)|
+--------------------------------+
|            -0.00345017634879...|
+--------------------------------+

+----------------------------

In [15]:
from pyspark.ml.feature import VectorAssembler

In [16]:
### Assemble features vector for Logistic Regression

assembler = VectorAssembler(inputCols = ["national_inv", "lead_time", "in_transit_qty", "min_bank", "local_bo_qty",
                                        "perf_6_month_avg", "perf_12_month_avg"],
                           outputCol = "features")

In [17]:
### Transform 'train' DataFrame with new feature vector

output = assembler.transform(train)

In [18]:
### Introduce quantitative output of qualitative 'went_on_backorder'

outputIndexed = indexer.fit(output).transform(output)

In [25]:
### Show features with quantitative transformation completed

outputIndexed.select("features", "backorder").show(truncate = False)

+------------------------------------------------+---------+
|features                                        |backorder|
+------------------------------------------------+---------+
|[62.0,7.923017712699746,0.0,1.0,0.0,-99.0,-99.0]|0.0      |
|[9.0,7.923017712699746,0.0,1.0,0.0,-99.0,-99.0] |0.0      |
|[17.0,8.0,0.0,0.0,0.0,0.92,0.95]                |0.0      |
|[9.0,2.0,0.0,0.0,0.0,0.78,0.75]                 |0.0      |
|[2.0,8.0,0.0,0.0,0.0,0.54,0.71]                 |0.0      |
|[15.0,2.0,0.0,0.0,0.0,0.37,0.68]                |0.0      |
|(7,[1,5,6],[7.923017712699746,-99.0,-99.0])     |0.0      |
|[28.0,7.923017712699746,0.0,0.0,0.0,-99.0,-99.0]|0.0      |
|[2.0,7.923017712699746,0.0,0.0,0.0,-99.0,-99.0] |0.0      |
|[2.0,7.923017712699746,0.0,0.0,0.0,-99.0,-99.0] |0.0      |
|[20.0,7.923017712699746,0.0,1.0,0.0,-99.0,-99.0]|0.0      |
|(7,[1,5,6],[7.923017712699746,-99.0,-99.0])     |0.0      |
|[13.0,7.923017712699746,0.0,0.0,0.0,-99.0,-99.0]|0.0      |
|[208.0,16.0,0.0,0.0,0.0

In [19]:
### 'Data' will be split between training and test.

data = outputIndexed.select("features", "backorder")

In [20]:
### Split training vs test data

train_data, test_data = data.randomSplit([0.9, 0.1])

In [28]:
### Logistic Regression Model

lr = LogisticRegression(labelCol = "backorder")

In [29]:
lrModel = lr.fit(train_data)

In [30]:
trainingModel = lrModel.summary

In [31]:
trainingModel.predictions.show(3)

+---------+---------+--------------------+--------------------+----------+
| features|backorder|       rawPrediction|         probability|prediction|
+---------+---------+--------------------+--------------------+----------+
|(7,[],[])|      0.0|[3.91004493495093...|[0.98035409551558...|       0.0|
|(7,[],[])|      0.0|[3.91004493495093...|[0.98035409551558...|       0.0|
|(7,[],[])|      0.0|[3.91004493495093...|[0.98035409551558...|       0.0|
+---------+---------+--------------------+--------------------+----------+
only showing top 3 rows



In [32]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [33]:
### Evaluate Logistic Regression model based on test_data that is fresh.

predictionAndLables = lrModel.evaluate(test_data)

In [34]:
predictionAndLables.predictions.show(3)

+-------------+---------+--------------------+--------------------+----------+
|     features|backorder|       rawPrediction|         probability|prediction|
+-------------+---------+--------------------+--------------------+----------+
|    (7,[],[])|      0.0|[3.91004493495093...|[0.98035409551558...|       0.0|
|(7,[0],[1.0])|      0.0|[3.91281781497300...|[0.98040742995279...|       0.0|
|(7,[0],[5.0])|      0.0|[3.92390933506128...|[0.98061935225882...|       0.0|
+-------------+---------+--------------------+--------------------+----------+
only showing top 3 rows



In [35]:
### Binary classification evaluation as options were '0.0' or '1.0'

churn_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                           labelCol='backorder')

In [36]:
### Result for classification, the same as random guess (~50%)

auc = churn_eval.evaluate(predictionAndLables.predictions)
auc

0.49997911096256686

### Use Gradient Boosted Trees

In [37]:
from pyspark.ml.classification import GBTClassifier

In [38]:
### Set up GBT Classified model

gbt = GBTClassifier(labelCol="backorder", featuresCol="features", maxIter=10)

In [39]:
### FIT 'train_data', meaning it has been transformed into including quantitative data ('backorder') and there are no
### 'null' values. Data has also been split between 'training' and 'testing'.

model = gbt.fit(train_data)

In [40]:
### TRANSFORM

predictions = model.transform(test_data)

In [41]:
predictions.select("prediction", "backorder", "features").show(2)

+----------+---------+-------------+
|prediction|backorder|     features|
+----------+---------+-------------+
|       0.0|      0.0|    (7,[],[])|
|       0.0|      0.0|(7,[0],[1.0])|
+----------+---------+-------------+
only showing top 2 rows



In [42]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [43]:
### Select (prediction, true label) and compute test error
### Evaluate model on accuracy

evaluator = MulticlassClassificationEvaluator(labelCol="backorder", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.0117258


In [44]:
### Roughly ~98 - 99%, much better than what random guess would be able to achieve (which is around 50%)

accuracy

0.9882741535920727

### Random Forest

In [21]:
from pyspark.ml.classification import RandomForestClassifier

In [22]:
rfc = RandomForestClassifier(labelCol='backorder',featuresCol='features')

In [23]:
rfcModel = rfc.fit(train_data)

In [24]:
rfcPredictions = rfcModel.transform(test_data)

In [25]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
### Accuracy metrics

In [26]:
accEvaluator = MulticlassClassificationEvaluator(labelCol="backorder", predictionCol="prediction", metricName="accuracy")

In [27]:
rfcAcc = accEvaluator.evaluate(rfcPredictions)

In [28]:
print('A random forest ensemble had an accuracy of: {0:2.2f}%'.format(rfcAcc*100))
print('-'*80)

A random forest ensemble had an accuracy of: 98.68%
--------------------------------------------------------------------------------


In [None]:
### F1 metrics

In [29]:
accEvaluatorF1 = MulticlassClassificationEvaluator(labelCol="backorder", predictionCol="prediction", metricName="f1")

In [30]:
rfcAccF1 = accEvaluatorF1.evaluate(rfcPredictions)

In [31]:
print('A random forest ensemble had an accuracy of: {0:2.2f}%'.format(rfcAccF1*100))
print('-'*80)

A random forest ensemble had an accuracy of: 98.03%
--------------------------------------------------------------------------------


#### Gradient Boosted Trees have highest score at 98.8% accurancy