In [5]:
import findspark

In [6]:
findspark.init("/opt/manual/spark")

In [7]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd
##from pyspark.sql.functions import col,datediff,lit,mean, min, max, sum, to_date,to_utc_timestamp, unix_timestamp
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [8]:
spark = SparkSession.builder \
.appName("KMeans") \
.master("yarn") \
.config("spark.sql.shuffle.partitions","2") \
.getOrCreate()

In [9]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [11]:
##! hdfs dfs -put /home/train/datasets/flo100k_data.csv /user/train/datasets/

In [12]:
df = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.option("sep","|") \
.load("hdfs://localhost:9000/user/train/datasets/flo100k_data.csv")

## Preprocessing Data

In [13]:
df.limit(5).toPandas()


Unnamed: 0,_c0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,last_order_date_new
0,0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,,1.0,212.98,0.0,,2019-02-23
1,1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,,1.0,199.98,0.0,,2019-12-01
2,2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,,1.0,140.49,0.0,[ERKEK],2020-07-24
3,3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,,0.0,174.99,,2018-12-31
4,4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,,0.0,283.95,[],2021-05-05


In [14]:
df_count = df.count()
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- master_id: string (nullable = true)
 |-- order_channel: string (nullable = true)
 |-- platform_type: string (nullable = true)
 |-- last_order_channel: string (nullable = true)
 |-- first_order_date: timestamp (nullable = true)
 |-- last_order_date: timestamp (nullable = true)
 |-- last_order_date_online: timestamp (nullable = true)
 |-- last_order_date_offline: timestamp (nullable = true)
 |-- order_num_total_ever_online: double (nullable = true)
 |-- order_num_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_online: double (nullable = true)
 |-- interested_in_categories_12: string (nullable = true)
 |-- last_order_date_new: string (nullable = true)



In [15]:
len(df.columns)

15

In [16]:
df.dtypes

[('_c0', 'int'),
 ('master_id', 'string'),
 ('order_channel', 'string'),
 ('platform_type', 'string'),
 ('last_order_channel', 'string'),
 ('first_order_date', 'timestamp'),
 ('last_order_date', 'timestamp'),
 ('last_order_date_online', 'timestamp'),
 ('last_order_date_offline', 'timestamp'),
 ('order_num_total_ever_online', 'double'),
 ('order_num_total_ever_offline', 'double'),
 ('customer_value_total_ever_offline', 'double'),
 ('customer_value_total_ever_online', 'double'),
 ('interested_in_categories_12', 'string'),
 ('last_order_date_new', 'string')]

### Null Check

In [17]:
for col_name in df.dtypes:
    null_count = df.filter( (F.col(col_name[0]).isNull()) | (F.col(col_name[0]) == "")).count()
    
    if null_count > 0:
        print("{}  {} type null values: {} % {}".format(col_name[0], col_name[1], null_count, (null_count/df_count * 100)))

last_order_date_online  timestamp type null values: 70784 % 70.784
last_order_date_offline  timestamp type null values: 21703 % 21.703
order_num_total_ever_online  double type null values: 70784 % 70.784
order_num_total_ever_offline  double type null values: 21703 % 21.703
interested_in_categories_12  string type null values: 56590 % 56.589999999999996


In [18]:
df.select("master_id").distinct().count()

100000

In [19]:
df = df.na.fill(value=0,subset=["order_num_total_ever_online"])
df = df.na.fill(value=0,subset=["order_num_total_ever_offline"])

### Grouping Attributes by Types, Handling Categorical Attributes

In [20]:
categoric_cols = []

numeric_cols = []

discarted_cols = ['_c0', 'master_id', 'first_order_date', 'last_order_date','last_order_date_online','last_order_date_offline','last_order_date_new','interested_in_categories_12' ]

label_col = []

In [21]:
for col_name in df.dtypes:
    if (col_name[0] not in discarted_cols+label_col):
        if col_name[1] == 'string':
            categoric_cols.append(col_name[0])
        else: numeric_cols.append(col_name[0])

In [22]:
print(categoric_cols)
print(len(categoric_cols))

['order_channel', 'platform_type', 'last_order_channel']
3


In [23]:
print(numeric_cols)
print(len(numeric_cols))

['order_num_total_ever_online', 'order_num_total_ever_offline', 'customer_value_total_ever_offline', 'customer_value_total_ever_online']
4


In [24]:
print(discarted_cols)
print(len(discarted_cols))

['_c0', 'master_id', 'first_order_date', 'last_order_date', 'last_order_date_online', 'last_order_date_offline', 'last_order_date_new', 'interested_in_categories_12']
8


In [25]:
print(label_col)
print(len(label_col))

[]
0


### Examine Categoric Columns

In [26]:
for col_name in categoric_cols:
    df.select(col_name).groupBy(col_name).count().show()

+-------------+-----+
|order_channel|count|
+-------------+-----+
|      Offline|70784|
|      Ios App| 3964|
|       Mobile| 8512|
|      Desktop| 4751|
|  Android App|11989|
+-------------+-----+

+-------------+-----+
|platform_type|count|
+-------------+-----+
|      Offline|65991|
|  OmniChannel|12569|
|       Online|21440|
+-------------+-----+

+------------------+-----+
|last_order_channel|count|
+------------------+-----+
|           Offline|73766|
|           Ios App| 3347|
|            Mobile| 7630|
|           Desktop| 4604|
|       Android App|10653|
+------------------+-----+



### Verifying Cols

In [27]:
if len(df.columns) == (len(categoric_cols)+len(numeric_cols)+len(discarted_cols+label_col)):
    print("Columns are OK")
else: print("Check columns AGAIN!")

Columns are OK


### Data Analysis

In [30]:
df=df.withColumn("order_number_total", col("order_num_total_ever_offline")+col("order_num_total_ever_online"))

In [29]:
df=df.withColumn("customer_value_total", col("customer_value_total_ever_online")+col("customer_value_total_ever_offline"))
df.limit(40).toPandas()

Unnamed: 0,_c0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,last_order_date_new,order_number_total,customer_value_total
0,0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,0.0,1.0,212.98,0.0,,2019-02-23,1.0,212.98
1,1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,0.0,1.0,199.98,0.0,,2019-12-01,1.0,199.98
2,2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,0.0,1.0,140.49,0.0,[ERKEK],2020-07-24,1.0,140.49
3,3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,0.0,0.0,174.99,,2018-12-31,1.0,174.99
4,4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,0.0,0.0,283.95,[],2021-05-05,2.0,283.95
5,5,47511f36-aeb4-11e9-a2fc-000d3a38a36f,Android App,Online,Android App,2018-11-11 11:26:51,2018-11-11 11:26:51,2018-11-11 11:26:51,NaT,1.0,0.0,0.0,139.98,,2018-11-11,1.0,139.98
6,6,77f7c318-3407-11eb-9a15-000d3a38a36f,Android App,Online,Android App,2020-12-01 20:38:41,2020-12-01 20:38:41,2020-12-01 20:38:41,NaT,1.0,0.0,0.0,269.9,[AKTIFSPOR],2020-12-01,1.0,269.9
7,7,399d6dd2-ecf1-11ea-9369-000d3a38a36f,Offline,Offline,Offline,2020-09-02 10:51:09,2020-09-02 10:51:09,NaT,2020-09-02 10:51:09,0.0,1.0,95.73,0.0,"[KADIN, AKTIFSPOR]",2020-09-02,1.0,95.73
8,8,b3d4a6f2-a368-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-03-29 16:53:07,2020-09-24 17:37:25,NaT,2020-09-24 17:37:25,0.0,3.0,207.94,0.0,"[ERKEK, KADIN]",2020-09-24,3.0,207.94
9,9,42fdccd4-f1d1-11ea-bde9-000d3a38a36f,Desktop,Online,Desktop,2020-10-18 22:59:55,2020-10-18 22:59:55,2020-10-18 22:59:55,NaT,1.0,0.0,0.0,134.95,[],2020-10-18,1.0,134.95


In [31]:
df.select('order_num_total_ever_offline').filter("order_num_total_ever_offline is NULL").show()

+----------------------------+
|order_num_total_ever_offline|
+----------------------------+
+----------------------------+



In [32]:
df.select('order_num_total_ever_online').filter("order_num_total_ever_online is NULL").show()

+---------------------------+
|order_num_total_ever_online|
+---------------------------+
+---------------------------+



In [33]:
df.groupBy("order_channel").count().orderBy(F.desc("count")).show()
df.groupBy("order_channel").avg("order_number_total").show()
df.groupBy("order_channel").avg("customer_value_total").show()


+-------------+-----+
|order_channel|count|
+-------------+-----+
|      Offline|70784|
|  Android App|11989|
|       Mobile| 8512|
|      Desktop| 4751|
|      Ios App| 3964|
+-------------+-----+

+-------------+-----------------------+
|order_channel|avg(order_number_total)|
+-------------+-----------------------+
|      Offline|     1.6003475361663653|
|      Ios App|      3.377648839556004|
|       Mobile|      2.798637218045113|
|      Desktop|      2.538623447695222|
|  Android App|       3.50971724080407|
+-------------+-----------------------+

+-------------+-------------------------+
|order_channel|avg(customer_value_total)|
+-------------+-------------------------+
|      Offline|        218.1639484912975|
|      Ios App|        568.4640312815283|
|      Desktop|       376.91355504103785|
|  Android App|        532.8462840937639|
|       Mobile|        391.7418761748011|
+-------------+-------------------------+



In [34]:
df.groupBy("platform_type").count().orderBy(F.desc("count")).show()
df.groupBy("platform_type").avg("order_number_total").show()
df.groupBy("platform_type").avg("customer_value_total").show()

+-------------+-----+
|platform_type|count|
+-------------+-----+
|      Offline|65991|
|       Online|21440|
|  OmniChannel|12569|
+-------------+-----+

+-------------+-----------------------+
|platform_type|avg(order_number_total)|
+-------------+-----------------------+
|  OmniChannel|     3.4947092051873656|
|      Offline|     1.5837917291751906|
|       Online|     2.6207089552238805|
+-------------+-----------------------+

+-------------+-------------------------+
|platform_type|avg(customer_value_total)|
+-------------+-------------------------+
|      Offline|       215.73030686012984|
|  OmniChannel|        500.7937512928769|
|       Online|       404.78965764929046|
+-------------+-------------------------+



### RFM

In [35]:
rfm_df = df.select("master_id","last_order_date_new","order_number_total","customer_value_total")

In [36]:
rfm_df = rfm_df.withColumn("Recency", expr("datediff('2021-06-02', last_order_date_new)"))
rfm_df = rfm_df.withColumnRenamed("order_number_total","Frequency")
rfm_df = rfm_df.withColumnRenamed("customer_value_total","Monetary")
rfm_df = rfm_df.drop(col("last_order_date_new"))

In [37]:
rfm_df.limit(5).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,830
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,549
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,313
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,884
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,28


### Feature Engineering

In [41]:
rfm_df = (
    rfm_df.withColumn("Monetary", 
        F.when(F.col("Monetary") <= 0, 1)
         .otherwise(F.col("Monetary")))
)

In [43]:
features = rfm_df.columns[1:]

In [62]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=features, 
    outputCol="unscaled_features")
assembled_data = assembler.transform(rfm_df)
assembled_data = assembled_data.select(
    'master_id', 'unscaled_features')

In [64]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='unscaled_features',
outputCol='features')
data_scale = scaler.fit(assembled_data)
scaled_data = data_scale.transform(assembled_data)

### Pipeline

In [65]:
from pyspark.ml import Pipeline
pipeline_obj = Pipeline().setStages([assembler, scaler])
pipeline_model = pipeline_obj.fit(rfm_df)
pipeline_df = pipeline_model.transform(rfm_df)
pipeline_df.limit(5).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency,unscaled_features,features
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,830,"[1.0, 212.98, 830.0]","[0.3277012075046896, 0.4598811451772491, 3.251..."
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,549,"[1.0, 199.98, 549.0]","[0.3277012075046896, 0.43181064612896175, 2.15..."
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,313,"[1.0, 140.49, 313.0]","[0.3277012075046896, 0.3033557239456838, 1.226..."
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,884,"[1.0, 174.99, 884.0]","[0.3277012075046896, 0.3778505098815233, 3.463..."
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,28,"[2.0, 283.95, 28.0]","[0.6554024150093792, 0.6131244772893224, 0.109..."


### K-Means Model

In [103]:
from pyspark.ml.clustering import KMeans
def compute_kmeans_model(rfm_df, k):
    kmeansObject = KMeans() \
        .setK(k)\
        .setFeaturesCol("features")
        
    return kmeansObject.fit(rfm_df)

### Evaluator

In [104]:
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()

In [106]:
for k in range(2,11):
    kmeans_model = compute_kmeans_model(pipeline_df, k)
    
    transformed_df = kmeans_model.transform(pipeline_df)
    
    score = evaluator.evaluate(transformed_df)
    
    print("k: {}, score: {}" .format(k, score))

k: 2, score: 0.7571631251678737
k: 3, score: 0.6559715046000123
k: 4, score: 0.6806248838694781
k: 5, score: 0.6662493724566684
k: 6, score: 0.532569710909801
k: 7, score: 0.5348584726699458
k: 8, score: 0.5532034595835308
k: 9, score: 0.5497715659455591
k: 10, score: 0.5445172495425756


In [107]:
kmeans_model = compute_kmeans_model(pipeline_df, 2)

In [111]:
transformed_df = kmeans_model.transform(pipeline_df)

In [123]:
transformed_df.limit(5).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency,unscaled_features,features,prediction
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,830,"[1.0, 212.98, 830.0]","[0.3277012075046896, 0.4598811451772491, 3.251...",0
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,549,"[1.0, 199.98, 549.0]","[0.3277012075046896, 0.43181064612896175, 2.15...",0
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,313,"[1.0, 140.49, 313.0]","[0.3277012075046896, 0.3033557239456838, 1.226...",0
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,884,"[1.0, 174.99, 884.0]","[0.3277012075046896, 0.3778505098815233, 3.463...",0
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,28,"[2.0, 283.95, 28.0]","[0.6554024150093792, 0.6131244772893224, 0.109...",0


## Statistical summary

In [121]:
transformed_df.groupBy('prediction')\
       .agg({'Recency':'mean',
             'Frequency': 'mean',
             'Monetary': 'mean'} )\
        .sort(F.col('prediction')).show()

+----------+------------------+------------------+-----------------+
|prediction|      avg(Recency)|     avg(Monetary)|   avg(Frequency)|
+----------+------------------+------------------+-----------------+
|         0| 443.9831130854056|202.49118453961572|1.520648243225144|
|         1|161.57891195693963| 1063.729280084586|6.572952710495963|
+----------+------------------+------------------+-----------------+

