In [1]:
import findspark

findspark.init()
import pyspark

In [2]:
from pyspark.sql import SparkSession
import pandas as pd
# Disable warnings, set Matplotlib inline plotting and load Pandas package
import warnings
warnings.filterwarnings('ignore')
import matplotlib.pyplot as plt
%matplotlib inline

In [3]:
# Importing DataSets

In [4]:
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
pd.set_option('display.width', 400)
pd.set_option('display.max_columns', 30)
spark = SparkSession.builder.appName('Capstone Project V4').getOrCreate()
orders= spark.read.csv(r"D:\TN\Capstone\Datasets\orders.csv", header = True, inferSchema = True)

In [5]:
orders.show(3)

+--------+-------+--------+------------+---------+-----------------+----------------------+
|order_id|user_id|eval_set|order_number|order_dow|order_hour_of_day|days_since_prior_order|
+--------+-------+--------+------------+---------+-----------------+----------------------+
| 2539329|      1|   prior|           1|        2|                8|                  null|
| 2398795|      1|   prior|           2|        3|                7|                  15.0|
|  473747|      1|   prior|           3|        3|               12|                  21.0|
+--------+-------+--------+------------+---------+-----------------+----------------------+
only showing top 3 rows



In [6]:
df = orders

In [7]:
print ("Rows     : " ,df.count())
print ("Columns  : " ,len(df.columns))

Rows     :  3421083
Columns  :  7


In [8]:
cols = ['user_id','order_number','days_since_prior_order']
df_1 =df.select(*cols)
df_1.show(3)

+-------+------------+----------------------+
|user_id|order_number|days_since_prior_order|
+-------+------------+----------------------+
|      1|           1|                  null|
|      1|           2|                  15.0|
|      1|           3|                  21.0|
+-------+------------+----------------------+
only showing top 3 rows



In [9]:
#Data cleaning: removing days since prior order = 0.0 and removing first purchase as they have no days since prior order
df = df.where(df["days_since_prior_order"].isNotNull())
df = df.where(df["days_since_prior_order"]!=0)
print("")
print("After dropping rows with null values")
print(df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas().transpose())



After dropping rows with null values
                        0
order_id                0
user_id                 0
eval_set                0
order_number            0
order_dow               0
order_hour_of_day       0
days_since_prior_order  0


In [10]:
'''RFE Segmentation
Recency -> days since last purchase
Frequency -> Average time between each purchase
Engagement -> Visit Duration'''

'RFE Segmentation\nRecency -> days since last purchase\nFrequency -> Average time between each purchase\nEngagement -> Visit Duration'

In [11]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col


window = Window.partitionBy(df_1['user_id']).orderBy(df_1['order_number'].desc())
df_2=df_1.select('*', rank().over(window).alias('rank')) .filter(col('rank') <= 1).orderBy (df_1['user_id'])

df_2 = df_2.withColumnRenamed('days_since_prior_order', 'recency')
df_2 = df_2.withColumnRenamed('order_number', 'total_orders')
df_2 = df_2.drop("order_number","rank")



In [12]:
df_2.where(df['user_id']==1).show(3)

+-------+------------+-------+
|user_id|total_orders|recency|
+-------+------------+-------+
|      1|          11|   14.0|
+-------+------------+-------+




df_4=df_1.withColumn("frequency", F.avg('days_since_prior_order').over(window))

In [13]:
from pyspark.sql.functions import sum
df_3=df_1.groupBy('user_id').agg(sum('days_since_prior_order').alias('Visit_duration')).orderBy(df_1['user_id'])
df_3.show(5)

+-------+--------------+
|user_id|Visit_duration|
+-------+--------------+
|      1|         190.0|
|      2|         228.0|
|      3|         144.0|
|      4|          85.0|
|      5|          46.0|
+-------+--------------+
only showing top 5 rows



In [14]:
df=df_2.join(df_3, on=['user_id'],how='inner')


In [15]:
df.where(df['user_id']==1).show(3)

+-------+------------+-------+--------------+
|user_id|total_orders|recency|Visit_duration|
+-------+------------+-------+--------------+
|      1|          11|   14.0|         190.0|
+-------+------------+-------+--------------+



In [16]:
window1 = Window.partitionBy(df_1['user_id'])

df_5=df_1.select('*', F.avg('days_since_prior_order').over(window1).alias('Frequency')).orderBy(df['user_id'])

In [17]:
#last 10 visits frequency average

In [18]:
df_4=df_1.select('*', rank().over(window).alias('rank')) .filter(col('rank') <= 10).orderBy (df_1['user_id'])
df_4 = df_4.drop("order_number","rank")
df_4=df_4.withColumn('frequency', F.round(F.avg('days_since_prior_order').over(window1)))

In [19]:
cols = ['user_id','order_number']
df_4.orderBy(*cols, ascending = True).show()

+-------+----------------------+---------+
|user_id|days_since_prior_order|frequency|
+-------+----------------------+---------+
|      1|                  15.0|     19.0|
|      1|                  21.0|     19.0|
|      1|                  29.0|     19.0|
|      1|                  28.0|     19.0|
|      1|                  19.0|     19.0|
|      1|                  20.0|     19.0|
|      1|                  14.0|     19.0|
|      1|                   0.0|     19.0|
|      1|                  30.0|     19.0|
|      1|                  14.0|     19.0|
|      2|                  13.0|     20.0|
|      2|                  14.0|     20.0|
|      2|                  27.0|     20.0|
|      2|                   8.0|     20.0|
|      2|                   6.0|     20.0|
|      2|                  30.0|     20.0|
|      2|                  28.0|     20.0|
|      2|                  30.0|     20.0|
|      2|                  13.0|     20.0|
|      2|                  30.0|     20.0|
+-------+--

In [20]:
df_4 = df_4.drop("days_since_prior_order").dropDuplicates()


In [21]:
df = df_4.join(df, on = 'user_id', how='inner')

In [22]:
df.where(df["user_id"]==1).show(5)

+-------+---------+------------+-------+--------------+
|user_id|frequency|total_orders|recency|Visit_duration|
+-------+---------+------------+-------+--------------+
|      1|     19.0|          11|   14.0|         190.0|
+-------+---------+------------+-------+--------------+



from pyspark.sql.functions import avg
freq=df_1.groupBy('user_id').(avg('days_since_prior_order').alias('Frequency')).orderBy(df_1['user_id'])


df_e=df_1.select('*', ('days_since_prior_order').over(window1).alias('Visit_duration'))

In [23]:
print(df.describe().toPandas())

  summary            user_id           frequency        total_orders             recency      Visit_duration
0   count             206209              206209              206209              206209              206209
1    mean           103105.0  15.568432997589824  16.590367054784224  17.061781978478145   173.2843765306073
2  stddev  59527.55516705405   7.030657928294545  16.654773501154427  10.672178387505193  100.99896693408382
3     min                  1                 0.0                   4                 0.0                 0.0
4     max             206209                30.0                 100                30.0               365.0


In [24]:

r_quartile = df.approxQuantile("Recency", [0.25, 0.5, 0.75], 0)
f_quartile = df.approxQuantile("Frequency", [0.25, 0.5, 0.75], 0)
v_quartile = df.approxQuantile("Visit_duration", [0.25, 0.5, 0.75], 0)

In [25]:
from pyspark.sql.functions import when,concat
df = df.withColumn("r_quartile", when(col("Recency") >= r_quartile[2] , 1).when(col("Recency") >= r_quartile[1] , 2).when(col("Recency") >= r_quartile[0] , 3).otherwise(4))
df = df.withColumn("f_quartile", when(col("Frequency") >= r_quartile[2] , 1).when(col("Frequency") >= r_quartile[1] , 2).when(col("Frequency") >= r_quartile[0] , 3).otherwise(4))
df = df.withColumn("e_quartile", when(col("Visit_duration") >= r_quartile[2] , 1).when(col("Visit_duration") >= r_quartile[1] , 2).when(col("Visit_duration") >= r_quartile[0] , 3).otherwise(4))


df = df.withColumn("RFE_Score", concat(col("r_quartile"), col("f_quartile"), col("e_quartile")))


In [26]:
df.dtypes

[('user_id', 'int'),
 ('frequency', 'double'),
 ('total_orders', 'int'),
 ('recency', 'double'),
 ('Visit_duration', 'double'),
 ('r_quartile', 'int'),
 ('f_quartile', 'int'),
 ('e_quartile', 'int'),
 ('RFE_Score', 'string')]

In [27]:
df.show(5)

+-------+---------+------------+-------+--------------+----------+----------+----------+---------+
|user_id|frequency|total_orders|recency|Visit_duration|r_quartile|f_quartile|e_quartile|RFE_Score|
+-------+---------+------------+-------+--------------+----------+----------+----------+---------+
|    148|     10.0|           8|   27.0|          69.0|         2|         3|         1|      231|
|    463|     19.0|           8|   25.0|         133.0|         2|         2|         1|      221|
|    471|     11.0|           7|   10.0|          65.0|         3|         3|         1|      331|
|    496|      2.0|          83|    2.0|         280.0|         4|         4|         1|      441|
|    833|     20.0|          12|   22.0|         233.0|         2|         2|         1|      221|
+-------+---------+------------+-------+--------------+----------+----------+----------+---------+
only showing top 5 rows



In [28]:
df = df.dropDuplicates()

In [29]:
df.dtypes


[('user_id', 'int'),
 ('frequency', 'double'),
 ('total_orders', 'int'),
 ('recency', 'double'),
 ('Visit_duration', 'double'),
 ('r_quartile', 'int'),
 ('f_quartile', 'int'),
 ('e_quartile', 'int'),
 ('RFE_Score', 'string')]

In [30]:
# Converting label recency to categorical column
def purchase_interval(r):
    if r <= 10:
        return "Class0"
    elif (r > 10) & (r <= 20):
        return "Class1"
    elif (r > 20) & (r <= 30):
        return "Class2"
    elif r > 30:
        return "Class3"

ol_val = udf(purchase_interval, StringType())

df = df.withColumn("purchase_interval",F.lit(ol_val(df.recency)))

In [31]:
#target column identification
target_col = df["purchase_interval"]
print("Target column is ",target_col)
drptar = df.drop("purchase_interval","recency")


Target column is  Column<b'purchase_interval'>


In [32]:
num_cols = [t[0] for t in drptar.dtypes if t[1] == 'int' or t[1] == 'double']
#num_cols = df._get_numeric_data().columns
print("numerical columns: ",num_cols)
cat_cols = [t[0] for t in drptar.dtypes if t[1] == 'string']
print("categorical columns: ",cat_cols)

numerical columns:  ['user_id', 'frequency', 'total_orders', 'Visit_duration', 'r_quartile', 'f_quartile', 'e_quartile']
categorical columns:  ['RFE_Score']


Data Preprocessing using StringIndexer, OneHotEncoder and VectorAssembler. First the input categorical columns are indexed using stringIndexer and then converted to corresponding numeric category using OneHotEncoderEstimator and assembled into a vector dataframe format using Vector Assembler.

In [33]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoderEstimator
categoricalColumns = cat_cols
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    
label_stringIdx = StringIndexer(inputCol = 'purchase_interval', outputCol = 'label')
stages += [label_stringIdx]
#numericCols = ['tenure', 'MonthlyCharges', 'TotalCharges']
numericCols = num_cols
assemblerInputs = [a + "classVec" for a in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


In [34]:
columns = df.columns
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + columns
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- frequency: double (nullable = true)
 |-- total_orders: integer (nullable = true)
 |-- recency: double (nullable = true)
 |-- Visit_duration: double (nullable = true)
 |-- r_quartile: integer (nullable = false)
 |-- f_quartile: integer (nullable = false)
 |-- e_quartile: integer (nullable = false)
 |-- RFE_Score: string (nullable = false)
 |-- purchase_interval: string (nullable = true)



In [35]:
train, test = df.randomSplit([0.8, 0.2], seed=12345)

In [38]:
from pyspark.ml.classification import NaiveBayes
from sklearn.metrics import f1_score, recall_score, precision_score
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
nbModel = nb.fit(train)
Train_predictions = nbModel.transform(train)
predictions = nbModel.transform(test)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
print('Training accuracy: ' + str(evaluator.evaluate(Train_predictions)))
print("Test Accuracy: " + str(evaluator.evaluate(predictions)))



Training accuracy: 0.5771401364717283
Test Accuracy: 0.5734632092520514


In [39]:
from sklearn import metrics


In [40]:
predictions_pandas = predictions.toPandas()


In [41]:
# Print the confusion matrix
print(metrics.confusion_matrix(predictions_pandas.label, predictions_pandas.prediction))


[[12449  3270  1625]
 [ 4726  9313  1525]
 [ 3381  2733  2518]]


In [42]:
# Print the precision and recall for each label
print(metrics.classification_report(predictions_pandas.label, predictions_pandas.prediction, digits=4))

             precision    recall  f1-score   support

        0.0     0.6056    0.7178    0.6569     17344
        1.0     0.6081    0.5984    0.6032     15564
        2.0     0.4442    0.2917    0.3522      8632

avg / total     0.5730    0.5845    0.5735     41540



In [44]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label",featuresCol="features",numTrees = 10,maxBins = 32)
# Train model with Training Data
RFmodel = rf.fit(train)
predictions = RFmodel.transform(test)
Train_predictions = RFmodel.transform(train)
evaluator = MulticlassClassificationEvaluator()
print('Training accuracy: ' + str(evaluator.evaluate(Train_predictions)))
print("Test Accuracy: " + str(evaluator.evaluate(predictions)))
predictions_pandas = predictions.toPandas()
# Print the confusion matrix
print(metrics.confusion_matrix(predictions_pandas.label, predictions_pandas.prediction))
# Print the precision and recall, among other metrics
print(metrics.classification_report(predictions_pandas.label, predictions_pandas.prediction, digits=4))

Training accuracy: 0.7145423990192628
Test Accuracy: 0.7099387904215853
[[17281     0    63]
 [    0 15564     0]
 [ 3998  4452   182]]
             precision    recall  f1-score   support

        0.0     0.8121    0.9964    0.8949     17344
        1.0     0.7776    1.0000    0.8749     15564
        2.0     0.7429    0.0211    0.0410      8632

avg / total     0.7848    0.7951    0.7099     41540



In [52]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=40)
lrModel = lr.fit(train)
Train_predictions = lrModel.transform(train)
predictions = lrModel.transform(test)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
print('Training accuracy: ' + str(evaluator.evaluate(Train_predictions)))
print("Test Accuracy: " + str(evaluator.evaluate(predictions)))

Training accuracy: 0.7998183688399056
Test Accuracy: 0.7972353490206296


In [58]:
Train_predictions_pandas = Train_predictions.toPandas()
# Print the confusion matrix
print(metrics.confusion_matrix(Train_predictions_pandas.label, Train_predictions_pandas.prediction))
# Print the precision and recall, among other metrics
print(metrics.classification_report(Train_predictions_pandas.label, Train_predictions_pandas.prediction, digits=4))

[[61972     0  6530]
 [    0 58681  3798]
 [ 7547 12967 13174]]
             precision    recall  f1-score   support

        0.0     0.8914    0.9047    0.8980     68502
        1.0     0.8190    0.9392    0.8750     62479
        2.0     0.5605    0.3911    0.4607     33688

avg / total     0.7963    0.8127    0.7998    164669



In [54]:
predictions_pandas = predictions.toPandas()
# Print the confusion matrix
print(metrics.confusion_matrix(predictions_pandas.label, predictions_pandas.prediction))
# Print the precision and recall, among other metrics
print(metrics.classification_report(predictions_pandas.label, predictions_pandas.prediction, digits=4))

[[15639     0  1705]
 [    0 14626   938]
 [ 1933  3310  3389]]
             precision    recall  f1-score   support

        0.0     0.8900    0.9017    0.8958     17344
        1.0     0.8155    0.9397    0.8732     15564
        2.0     0.5618    0.3926    0.4622      8632

avg / total     0.7939    0.8102    0.7972     41540



In [55]:

print("class0: ",train.filter(train['label']==0).count())
print("class1: ",train.filter(train['label']==1).count())
print("class2: ",train.filter(train['label']==2).count())

class0:  68502
class1:  62479
class2:  33688


There is an imbalance in dataset
So, in case of logistic regression, i hope adding classweights in weightcol parameter would increase class2 prediction accuracy.

In [None]:
#BalancingRatio= numNegatives/dataset_size
#print('BalancingRatio: '+ str(BalancingRatio))

In [None]:
#HyperParameter Tuning
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder().addGrid(lr.aggregationDepth,[2,5,10]).addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0]).addGrid(lr.fitIntercept,[False, True]).addGrid(lr.maxIter,[10, 100, 1000]).addGrid(lr.regParam,[0.01, 0.5, 2.0]).build()
# Creating 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# fitting the model
cvModel = cv.fit(train)
Train_predictions=cvModel.transform(train)
predictions=cvModel.transform(test)
print('Training accuracy: ' + str(evaluator.evaluate(Train_predictions)))
print("Test Accuracy: " + str(evaluator.evaluate(predictions)))