## Importing the dataset from the external csv file using the spark schema

In [42]:
# start session of spark using spark session to process the bigdata in parallel instead of python pandas
# print the dataschema of the dataset
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-bank').getOrCreate()
df = spark.read.csv('sampled.csv', header = True, inferSchema = True)
df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- user_total_orders: integer (nullable = true)
 |-- user_total_items: integer (nullable = true)
 |-- total_distinct_items: integer (nullable = true)
 |-- user_average_days_between_orders: double (nullable = true)
 |-- user_average_basket: double (nullable = true)
 |-- dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- days_since_prior_order: integer (nullable = true)
 |-- days_since_ratio: double (nullable = true)
 |-- aisle_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- product_orders: integer (nullable = true)
 |-- product_reorders: double (nullable = true)
 |-- product_reorder_rate: double (nullable = true)
 |-- z: integer (nullable = true)
 |-- UP_orders: integer (nullable = true)
 |-- UP_orders_ratio: double (nullable = true)
 |-- UP_last_order_id: integer (nullable = true)
 |-- UP_average_pos_in_cart: double (nullable = t

In [43]:
## Test the data exported using the spark is valid with pandas and describtion of the data set 
## for better understanding
import pandas as pd
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
order_id,1187899.0,1187899.0,1187899.0,1187899.0,1187899.0
product_id,17122.0,196.0,26405.0,46149.0,14084.0
user_total_orders,11.0,11.0,11.0,11.0,11.0
user_total_items,59.0,59.0,59.0,59.0,59.0
total_distinct_items,18.0,18.0,18.0,18.0,18.0
user_average_days_between_orders,17.27273,17.27273,17.27273,17.27273,17.27273
user_average_basket,5.363636,5.363636,5.363636,5.363636,5.363636
dow,4.0,4.0,4.0,4.0,4.0
order_hour_of_day,8.0,8.0,8.0,8.0,8.0
days_since_prior_order,14.0,14.0,14.0,14.0,14.0


## Data set summary using the mean maximum mininum and standard deviation

In [44]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
order_id,200000,1713884.290375,978079.994815277,988,3419642
product_id,200000,25497.863965,14207.50981904984,1,49683
user_total_orders,200000,25.87698,22.208253018013558,4,100
user_total_items,200000,295.68659,296.05106676947287,3,2343
total_distinct_items,200000,109.60159,74.66360547644189,1,448
dow,200000,2.800755,2.171662959709692,0,6
order_hour_of_day,200000,13.41037,4.174296777526455,0,23
days_since_prior_order,200000,14.462315,10.230197609307995,0,30
aisle_id,200000,70.982795,38.03233152567916,1,134


In [45]:
df=df.fillna(0)

## Creating the stage for preprocessing , creation of feature vector space and label from the existing dataframe

In [68]:
# Stages are used to process the data in the sequential format to avoid the parallel processing 
# due to data inconsistancy. And each stages are added to the list of stages array subsequently.
stages = []
from pyspark.ml.feature import VectorAssembler


# Ignoring the features which are not required to create the feature vector space
ignore = ['order_id','labels','dow']
assembler = VectorAssembler(
    inputCols=[x for x in df.columns if x not in ignore],
    outputCol='features')

stages += [assembler]

### Preprocessing the data to valid form to feed the model

In [69]:
# Convert label into label indices using the StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

# Using string indexer to create the labels
label_stringIdx = StringIndexer(inputCol="labels", outputCol="label")
stages += [label_stringIdx]

### Creating the LogisticRegression

In [70]:
from pyspark.ml.classification import LogisticRegression
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(df)
preppedDataDF = pipelineModel.transform(df)

In [71]:
# Fit model to prepped data
lrModel = LogisticRegression().fit(preppedDataDF)

# ROC for training data
display(lrModel, preppedDataDF, "ROC")

LogisticRegressionModel: uid = LogisticRegression_5a5323113b32, numClasses = 2, numFeatures = 22

DataFrame[order_id: int, product_id: int, user_total_orders: int, user_total_items: int, total_distinct_items: int, user_average_days_between_orders: double, user_average_basket: double, dow: int, order_hour_of_day: int, days_since_prior_order: int, days_since_ratio: double, aisle_id: int, department_id: int, product_orders: int, product_reorders: double, product_reorder_rate: double, z: int, UP_orders: int, UP_orders_ratio: double, UP_last_order_id: int, UP_average_pos_in_cart: double, UP_reorder_rate: double, UP_orders_since_last: int, UP_delta_hour_vs_last: int, labels: int, features: vector, label: double]

'ROC'

In [72]:
# Keep relevant columns
selectedcols = ["label", "features"]
dataset = preppedDataDF.select(selectedcols)
display(dataset)

DataFrame[label: double, features: vector]

In [73]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

140102
59898


In [74]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [75]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [76]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.8020520972333169

### RandomForest Classifier

In [82]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(trainingData)

In [83]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = rfModel.transform(testData)

In [84]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.7942094863657846

In [85]:
evaluator.getMetricName()

'areaUnderROC'

### DecisionTree Classifier 

In [89]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)

In [93]:
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)

numNodes =  5
depth =  2


In [94]:
display(dtModel)

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_436df87d7f7a) of depth 2 with 5 nodes

In [95]:
# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)

In [96]:
predictions.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [97]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

0.31958503271457467

### LogisticRegression      - Score 80%
### RandomForest Classifier - Score 79%
### DecisionTree Classifier - Score 32%

### Best model is LogisticRegression

### Cross validation for LogisticRegression

In [77]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [78]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [79]:
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [80]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

0.8011044610624181