### IMPORT LIBRARY AND OPEN SPARK SESSION

In [1]:
import findspark
import pandas as pd
import warnings

In [2]:
findspark.init("/opt/manual/spark")

In [3]:
warnings.filterwarnings("ignore", category=FutureWarning)

In [4]:
! hdfs dfs -ls /user/train/datasets

Found 5 items
-rw-r--r--   1 train supergroup       4556 2020-09-23 20:56 /user/train/datasets/Advertising.csv
drwxr-xr-x   - train supergroup          0 2023-09-12 21:47 /user/train/datasets/House_Price
drwxr-xr-x   - train supergroup          0 2020-11-19 21:02 /user/train/datasets/churn-telecom
-rw-r--r--   1 train supergroup   19422619 2023-09-13 12:01 /user/train/datasets/flo100k_data.csv
drwxr-xr-x   - train supergroup          0 2020-11-21 11:16 /user/train/datasets/retail_db


In [5]:
#! hdfs dfs -put ~/datasets/flo100k_data.csv /user/train/datasets

In [5]:
import pyspark
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder\
                    .master("yarn")\
                    .appName("Clustering_KNN")\
                    .getOrCreate()
sc = spark.sparkContext

In [6]:
sc.version

'3.0.0'

### READ DATA

In [7]:
df = spark.read.csv("/user/train/datasets/flo100k_data.csv", header=True, inferSchema=True, sep="|")

In [8]:
df.persist()

DataFrame[_c0: int, master_id: string, order_channel: string, platform_type: string, last_order_channel: string, first_order_date: timestamp, last_order_date: timestamp, last_order_date_online: timestamp, last_order_date_offline: timestamp, order_num_total_ever_online: double, order_num_total_ever_offline: double, customer_value_total_ever_offline: double, customer_value_total_ever_online: double, interested_in_categories_12: string, last_order_date_new: string]

### EXPLORE DATA

In [9]:
df.limit(5).toPandas()

Unnamed: 0,_c0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,last_order_date_new
0,0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,,1.0,212.98,0.0,,2019-02-23
1,1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,,1.0,199.98,0.0,,2019-12-01
2,2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,,1.0,140.49,0.0,[ERKEK],2020-07-24
3,3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,,0.0,174.99,,2018-12-31
4,4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,,0.0,283.95,[],2021-05-05


In [10]:
df_count = df.count()
print(df_count)

100000


In [11]:
len(df.columns)

15

In [12]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- master_id: string (nullable = true)
 |-- order_channel: string (nullable = true)
 |-- platform_type: string (nullable = true)
 |-- last_order_channel: string (nullable = true)
 |-- first_order_date: timestamp (nullable = true)
 |-- last_order_date: timestamp (nullable = true)
 |-- last_order_date_online: timestamp (nullable = true)
 |-- last_order_date_offline: timestamp (nullable = true)
 |-- order_num_total_ever_online: double (nullable = true)
 |-- order_num_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_online: double (nullable = true)
 |-- interested_in_categories_12: string (nullable = true)
 |-- last_order_date_new: string (nullable = true)



In [13]:
df = df.drop("_c0")

### NULL CHECK

In [14]:
def null_count(dataframe, col_name):
    nc = dataframe.select(col_name).filter(
                                              (F.col(col_name) == "NA") |
                                              (F.col(col_name).isNull()) |
                                              (F.col(col_name) == "")).count()
    return nc

In [15]:
df.select("interested_in_categories_12").filter(
                                              (F.col("interested_in_categories_12") == "NA") |
                                              (F.col("interested_in_categories_12").isNull()) |
                                              (F.col("interested_in_categories_12") == "")).count()

56590

In [16]:
null_count(df, "interested_in_categories_12")

56590

In [17]:
for col_name in df.dtypes:
    nc = null_count(df, col_name[0])
    if nc > 0:
        print("{} has {} null value and % {}".format(col_name[0], nc, (nc/df_count)*100))

last_order_date_online has 70784 null value and % 70.784
last_order_date_offline has 21703 null value and % 21.703
order_num_total_ever_online has 70784 null value and % 70.784
order_num_total_ever_offline has 21703 null value and % 21.703
interested_in_categories_12 has 56590 null value and % 56.589999999999996


### IS MASTER ID UNIQUE?

In [18]:
df.select("master_id").distinct().count() == df.count()

True

### DATA UNDERSTANDING

In [19]:
df.groupBy(["platform_type", "order_channel"]).count().show()

+-------------+-------------+-----+
|platform_type|order_channel|count|
+-------------+-------------+-----+
|       Online|       Mobile| 6451|
|       Online|  Android App| 8728|
|  OmniChannel|      Offline| 4793|
|      Offline|      Offline|65991|
|  OmniChannel|  Android App| 3261|
|       Online|      Ios App| 3008|
|  OmniChannel|      Ios App|  956|
|       Online|      Desktop| 3253|
|  OmniChannel|       Mobile| 2061|
|  OmniChannel|      Desktop| 1498|
+-------------+-------------+-----+



### CREATE NEW COLUMN FROM DF COLUMNS

In [20]:
df = df.fillna(0, subset=["order_num_total_ever_online","order_num_total_ever_offline"])

In [21]:
df1 = df.withColumn("order_num_total", F.col("order_num_total_ever_online") + F.col("order_num_total_ever_offline"))\
        .withColumn("customer_value_total", F.col("customer_value_total_ever_online") + F.col("customer_value_total_ever_offline"))

In [22]:
df1.limit(3).toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,last_order_date_new,order_num_total,customer_value_total
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,0.0,1.0,212.98,0.0,,2019-02-23,1.0,212.98
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,0.0,1.0,199.98,0.0,,2019-12-01,1.0,199.98
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,0.0,1.0,140.49,0.0,[ERKEK],2020-07-24,1.0,140.49


### ANALYSIS ORDER CHANNEL AND PLATFORM

In [28]:
df1.groupBy("order_channel").agg({"master_id": "count"}).show()

+-------------+----------------+
|order_channel|count(master_id)|
+-------------+----------------+
|  Android App|           11989|
|       Mobile|            8512|
|      Ios App|            3964|
|      Desktop|            4751|
|      Offline|           70784|
+-------------+----------------+



In [29]:
df1.groupBy("order_channel").agg({"order_num_total": "mean"}).show()

+-------------+--------------------+
|order_channel|avg(order_num_total)|
+-------------+--------------------+
|  Android App|    3.50971724080407|
|       Mobile|   2.798637218045113|
|      Ios App|   3.377648839556004|
|      Desktop|   2.538623447695222|
|      Offline|  1.6003475361663653|
+-------------+--------------------+



In [30]:
df1.groupBy("order_channel").agg({"customer_value_total": "sum"}).show()

+-------------+-------------------------+
|order_channel|sum(customer_value_total)|
+-------------+-------------------------+
|  Android App|        6388294.100000136|
|       Mobile|        3334506.849999907|
|      Ios App|        2253391.419999978|
|      Desktop|       1790716.2999999707|
|      Offline|     1.5442516930008002E7|
+-------------+-------------------------+



In [31]:
df1.groupBy("order_channel").agg(F.count("master_id"), F.mean("order_num_total"), F.sum("customer_value_total")).show()

+-------------+----------------+--------------------+-------------------------+
|order_channel|count(master_id)|avg(order_num_total)|sum(customer_value_total)|
+-------------+----------------+--------------------+-------------------------+
|  Android App|           11989|    3.50971724080407|        6388294.100000136|
|       Mobile|            8512|   2.798637218045113|        3334506.849999907|
|      Ios App|            3964|   3.377648839556004|        2253391.419999978|
|      Desktop|            4751|   2.538623447695222|       1790716.2999999707|
|      Offline|           70784|  1.6003475361663653|     1.5442516930008002E7|
+-------------+----------------+--------------------+-------------------------+



In [32]:
df1.groupBy("platform_type").agg({"master_id": "count"}).show()

+-------------+----------------+
|platform_type|count(master_id)|
+-------------+----------------+
|  OmniChannel|           12569|
|       Online|           21440|
|      Offline|           65991|
+-------------+----------------+



In [33]:
df1.groupBy("platform_type").agg({"order_num_total": "mean"}).show()

+-------------+--------------------+
|platform_type|avg(order_num_total)|
+-------------+--------------------+
|  OmniChannel|  3.4947092051873656|
|       Online|  2.6207089552238805|
|      Offline|  1.5837917291751906|
+-------------+--------------------+



In [34]:
df1.groupBy("platform_type").agg({"customer_value_total": "sum"}).show()

+-------------+-------------------------+
|platform_type|sum(customer_value_total)|
+-------------+-------------------------+
|  OmniChannel|         6294476.66000017|
|       Online|        8678690.260000788|
|      Offline|     1.4236258680006828E7|
+-------------+-------------------------+



In [35]:
df1.groupBy("platform_type").agg(F.count("master_id"), F.mean("order_num_total"), F.sum("customer_value_total")).show()

+-------------+----------------+--------------------+-------------------------+
|platform_type|count(master_id)|avg(order_num_total)|sum(customer_value_total)|
+-------------+----------------+--------------------+-------------------------+
|  OmniChannel|           12569|  3.4947092051873656|         6294476.66000017|
|       Online|           21440|  2.6207089552238805|        8678690.260000788|
|      Offline|           65991|  1.5837917291751906|     1.4236258680006828E7|
+-------------+----------------+--------------------+-------------------------+



### CALCULATION OF K-MEANS METRICS

In [23]:
columns = ["master_id", "last_order_date_new", "order_num_total", "customer_value_total"]
rfm = df1.select(columns)

In [24]:
rfm.limit(5).toPandas()

Unnamed: 0,master_id,last_order_date_new,order_num_total,customer_value_total
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,2019-02-23,1.0,212.98
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,2019-12-01,1.0,199.98
2,602897a6-cdac-11ea-b31f-000d3a38a36f,2020-07-24,1.0,140.49
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,2018-12-31,1.0,174.99
4,80664354-adf0-11eb-8f64-000d3a299ebf,2021-05-05,2.0,283.95


In [25]:
last_date_order = rfm.agg({"last_order_date_new": "max"}).collect()[0][0]

# last_date = rfm.orderBy("last_order_date_new", ascending = False).first()["last_order_date_new"]

print(last_date_order)

2021-05-30


In [26]:
rfm1 = rfm.withColumn("Recency", F.expr(f"datediff('{last_date_order}', last_order_date_new) + 2"))

In [27]:
rfm1.printSchema()

root
 |-- master_id: string (nullable = true)
 |-- last_order_date_new: string (nullable = true)
 |-- order_num_total: double (nullable = false)
 |-- customer_value_total: double (nullable = true)
 |-- Recency: integer (nullable = true)



In [28]:
rfm1.limit(3).toPandas()

Unnamed: 0,master_id,last_order_date_new,order_num_total,customer_value_total,Recency
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,2019-02-23,1.0,212.98,829
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,2019-12-01,1.0,199.98,548
2,602897a6-cdac-11ea-b31f-000d3a38a36f,2020-07-24,1.0,140.49,312


In [29]:
rfm1 = rfm1.withColumnRenamed("order_num_total", "Frequency").withColumnRenamed("customer_value_total", "Monetary")

In [30]:
rfm2 = rfm1.drop("last_order_date_new")

In [31]:
rfm2.limit(5).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,829
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,548
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,312
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,883
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,27


### VECTOR ASSEMBLER

In [32]:
assembler_cols = rfm2.columns[1:]

In [33]:
from pyspark.ml.feature import VectorAssembler

In [34]:
assembler = VectorAssembler()\
    .setHandleInvalid("skip")\
    .setInputCols(assembler_cols)\
    .setOutputCol("unscaled_features")

### FEATURE SCALER

In [35]:
from pyspark.ml.feature import StandardScaler

In [36]:
scaler = StandardScaler()\
    .setInputCol("unscaled_features")\
    .setOutputCol("features")

### PIPELINE

In [37]:
from pyspark.ml import Pipeline

In [38]:
pipeline_obj = Pipeline().setStages([assembler, scaler])

In [39]:
pipeline_model = pipeline_obj.fit(rfm2)

In [40]:
pipeline_df = pipeline_model.transform(rfm2)

In [41]:
pipeline_df.limit(5).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency,unscaled_features,features
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,829,"[1.0, 212.98, 829.0]","[0.3277012075046896, 0.4598810576445157, 3.247..."
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,548,"[1.0, 199.98, 548.0]","[0.3277012075046896, 0.43181056393910344, 2.14..."
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,312,"[1.0, 140.49, 312.0]","[0.3277012075046896, 0.3033556662056438, 1.222..."
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,883,"[1.0, 174.99, 883.0]","[0.3277012075046896, 0.3778504379623148, 3.459..."
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,27,"[2.0, 283.95, 27.0]","[0.6554024150093792, 0.613124360588601, 0.1057..."


### K-MEANS MODEL AND OPTIMUM CLUSTER DETERMINATION

In [42]:
from pyspark.ml.clustering import KMeans

def compute_kmeans_model(df, k):
    kmeansObject = KMeans() \
        .setSeed(142) \
        .setK(k)
    
    return kmeansObject.fit(df)

In [43]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator()

In [44]:
for k in range(2,11):
    kmeans_model = compute_kmeans_model(pipeline_df, k)
    
    transformed_df = kmeans_model.transform(pipeline_df)
    
    score = evaluator.evaluate(transformed_df)
    
    print("k: {}, score: {}" .format(k, score))

k: 2, score: 0.4403260308625532
k: 3, score: 0.6549567983919242
k: 4, score: 0.6705380055471301
k: 5, score: 0.506336181525216
k: 6, score: 0.5124135578791428
k: 7, score: 0.5471947591534061
k: 8, score: 0.5710920011281075
k: 9, score: 0.5310175131579818
k: 10, score: 0.5571794995718166


In [45]:
kmeans_model = compute_kmeans_model(pipeline_df, 4)

### PREDICTION

In [46]:
transformed_df = kmeans_model.transform(pipeline_df)

In [47]:
transformed_df.limit(10).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency,unscaled_features,features,prediction
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,829,"[1.0, 212.98, 829.0]","[0.3277012075046896, 0.4598810576445157, 3.247...",3
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,548,"[1.0, 199.98, 548.0]","[0.3277012075046896, 0.43181056393910344, 2.14...",3
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,312,"[1.0, 140.49, 312.0]","[0.3277012075046896, 0.3033556662056438, 1.222...",2
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,883,"[1.0, 174.99, 883.0]","[0.3277012075046896, 0.3778504379623148, 3.459...",3
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,27,"[2.0, 283.95, 27.0]","[0.6554024150093792, 0.613124360588601, 0.1057...",2
5,47511f36-aeb4-11e9-a2fc-000d3a38a36f,1.0,139.98,933,"[1.0, 139.98, 933.0]","[0.3277012075046896, 0.302254439144893, 3.6553...",3
6,77f7c318-3407-11eb-9a15-000d3a38a36f,1.0,269.9,182,"[1.0, 269.9, 182.0]","[0.3277012075046896, 0.58278663469929, 0.71305...",2
7,399d6dd2-ecf1-11ea-9369-000d3a38a36f,1.0,95.73,272,"[1.0, 95.73, 272.0]","[0.3277012075046896, 0.20670679710916282, 1.06...",2
8,b3d4a6f2-a368-11e9-a2fc-000d3a38a36f,3.0,207.94,250,"[3.0, 207.94, 250.0]","[0.9831036225140688, 0.44899834316180204, 0.97...",2
9,42fdccd4-f1d1-11ea-bde9-000d3a38a36f,1.0,134.95,226,"[1.0, 134.95, 226.0]","[0.3277012075046896, 0.291393317349645, 0.8854...",2


### DESCRIPTIVE STATISTICSB

In [48]:
transformed_df.groupBy("prediction").agg(F.mean("Frequency"), F.stddev_samp("Frequency"), F.max("Frequency"), F.min("Frequency")).show()

+----------+------------------+----------------------+--------------+--------------+
|prediction|    avg(Frequency)|stddev_samp(Frequency)|max(Frequency)|min(Frequency)|
+----------+------------------+----------------------+--------------+--------------+
|         1|226.83333333333334|    104.16221323813481|         377.0|          89.0|
|         3| 1.447839403668032|    0.8938539521651988|          10.0|           1.0|
|         2| 2.090466397182233|    1.3865696565489902|          10.0|           1.0|
|         0|10.410821643286573|     7.283638849228587|         126.0|           1.0|
+----------+------------------+----------------------+--------------+--------------+



In [49]:
transformed_df.groupBy("prediction").agg(F.mean("Monetary"), F.stddev_samp("Monetary"), F.max("Monetary"), F.min("Monetary")).show()

+----------+------------------+---------------------+-------------+-------------+
|prediction|     avg(Monetary)|stddev_samp(Monetary)|max(Monetary)|min(Monetary)|
+----------+------------------+---------------------+-------------+-------------+
|         1| 31003.60666666667|   12517.356352959947|     51178.23|     17005.69|
|         3|179.76939407365893|   145.10751558956022|      1415.02|          0.0|
|         2| 317.2782289672691|   239.24831915230445|      1689.94|          0.0|
|         0|1677.1947953048953|   1074.3630748492496|     18905.75|       199.98|
+----------+------------------+---------------------+-------------+-------------+



In [50]:
transformed_df.groupBy("prediction").agg(F.mean("Recency"), F.stddev_samp("Recency"), F.max("Recency"), F.min("Recency")).show()

+----------+------------------+--------------------+------------+------------+
|prediction|      avg(Recency)|stddev_samp(Recency)|max(Recency)|min(Recency)|
+----------+------------------+--------------------+------------+------------+
|         1|             343.5|  262.99866919815395|         657|          24|
|         3| 615.3469131702277|  134.83233257633717|        1097|         366|
|         2|173.50268302484457|   115.6755916356068|         521|           2|
|         0|191.19982822788435|  204.17005644138567|        1089|           2|
+----------+------------------+--------------------+------------+------------+

