In [27]:
import findspark

findspark.init("/opt/manual/spark")

from pyspark.sql import SparkSession, functions as F
import pandas as pd
from pyspark.sql.types import *

spark = (
SparkSession.builder
    .appName("Flo Segmentation with Unsupervised Learning")
    .master("yarn")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

In [28]:
df = spark.read.format("csv") \
.option("header", True) \
.option("delimiter", "|") \
.option("inferSchema", True) \
.load("/user/train/datasets/flo100k.csv")

In [29]:
df.persist()

DataFrame[master_id: string, order_channel: string, platform_type: string, last_order_channel: string, first_order_date: timestamp, last_order_date: timestamp, last_order_date_online: timestamp, last_order_date_offline: timestamp, order_num_total_ever_online: double, order_num_total_ever_offline: double, customer_value_total_ever_offline: double, customer_value_total_ever_online: double, interested_in_categories_12: string, online_product_group_amount_top_name_12: string, offline_product_group_name_12: string, last_order_date_new: string, store_type: string]

In [30]:
df.limit(5).toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,online_product_group_amount_top_name_12,offline_product_group_name_12,last_order_date_new,store_type
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,,1.0,212.98,0.0,,,,2019-02-23,A
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,,1.0,199.98,0.0,,,,2019-12-01,A
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,,1.0,140.49,0.0,[ERKEK],,ERKEK,2020-07-24,A
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,,0.0,174.99,,,,2018-12-31,A
4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,,0.0,283.95,[],,,2021-05-05,A


In [31]:
df_count = df.count()
print(df_count)

100000


In [32]:
len(df.columns)

17

In [33]:
df.printSchema()

root
 |-- master_id: string (nullable = true)
 |-- order_channel: string (nullable = true)
 |-- platform_type: string (nullable = true)
 |-- last_order_channel: string (nullable = true)
 |-- first_order_date: timestamp (nullable = true)
 |-- last_order_date: timestamp (nullable = true)
 |-- last_order_date_online: timestamp (nullable = true)
 |-- last_order_date_offline: timestamp (nullable = true)
 |-- order_num_total_ever_online: double (nullable = true)
 |-- order_num_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_online: double (nullable = true)
 |-- interested_in_categories_12: string (nullable = true)
 |-- online_product_group_amount_top_name_12: string (nullable = true)
 |-- offline_product_group_name_12: string (nullable = true)
 |-- last_order_date_new: string (nullable = true)
 |-- store_type: string (nullable = true)



In [34]:
for col_name in df.dtypes:
    null_count = df.filter((F.col(col_name[0]).isNull()) | (F.col(col_name[0]) == "") | (F.col(col_name[0]) == "NA")).count()

    if null_count > 0:
        print("{} {} type null values: {}  % {}".format(col_name[0], col_name[1], null_count, null_count/df_count*100))

last_order_date_online timestamp type null values: 70784  % 70.784
last_order_date_offline timestamp type null values: 21703  % 21.703
order_num_total_ever_online double type null values: 70784  % 70.784
order_num_total_ever_offline double type null values: 21703  % 21.703
interested_in_categories_12 string type null values: 56590  % 56.589999999999996
online_product_group_amount_top_name_12 string type null values: 88295  % 88.295
offline_product_group_name_12 string type null values: 77209  % 77.209


In [37]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct("master_id")) == df.count()

False

In [38]:
df.groupBy(["platform_type", "order_channel"]).count().show()

+-------------+-------------+-----+
|platform_type|order_channel|count|
+-------------+-------------+-----+
|  OmniChannel|      Offline| 4793|
|       Online|      Ios App| 3008|
|       Online|       Mobile| 6451|
|      Offline|      Offline|65991|
|       Online|  Android App| 8728|
|  OmniChannel|  Android App| 3261|
|       Online|      Desktop| 3253|
|  OmniChannel|      Ios App|  956|
|  OmniChannel|      Desktop| 1498|
|  OmniChannel|       Mobile| 2061|
+-------------+-------------+-----+



In [43]:
df = df.fillna({"order_num_total_ever_online":0, "order_num_total_ever_offline":0})
df = df.withColumn("order_num_total", df.order_num_total_ever_online + df.order_num_total_ever_offline)
df = df.withColumn("customer_value_total", df.customer_value_total_ever_offline + df.customer_value_total_ever_online)

In [44]:
df.limit(5).toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,online_product_group_amount_top_name_12,offline_product_group_name_12,last_order_date_new,store_type,order_num_total,customer_value_total
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,0.0,1.0,212.98,0.0,,,,2019-02-23,A,1.0,212.98
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,0.0,1.0,199.98,0.0,,,,2019-12-01,A,1.0,199.98
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,0.0,1.0,140.49,0.0,[ERKEK],,ERKEK,2020-07-24,A,1.0,140.49
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,0.0,0.0,174.99,,,,2018-12-31,A,1.0,174.99
4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,0.0,0.0,283.95,[],,,2021-05-05,A,2.0,283.95


In [45]:
df.groupBy("order_channel").agg({"master_id":"count", "order_num_total":"mean", "customer_value_total":"mean"}).show()

+-------------+----------------+-------------------------+--------------------+
|order_channel|count(master_id)|avg(customer_value_total)|avg(order_num_total)|
+-------------+----------------+-------------------------+--------------------+
|      Offline|           70784|       218.16394849129728|  1.6003475361663653|
|      Ios App|            3964|        568.4640312815283|   3.377648839556004|
|       Mobile|            8512|        391.7418761748012|   2.798637218045113|
|  Android App|           11989|        532.8462840937639|    3.50971724080407|
|      Desktop|            4751|       376.91355504103785|   2.538623447695222|
+-------------+----------------+-------------------------+--------------------+



In [46]:
df.groupBy("platform_type").agg({"master_id":"count", "order_num_total":"mean", "customer_value_total":"mean"}).show()

+-------------+----------------+-------------------------+--------------------+
|platform_type|count(master_id)|avg(customer_value_total)|avg(order_num_total)|
+-------------+----------------+-------------------------+--------------------+
|      Offline|           65991|        215.7303068601296|  1.5837917291751906|
|  OmniChannel|           12569|        500.7937512928769|  3.4947092051873656|
|       Online|           21440|        404.7896576492903|  2.6207089552238805|
+-------------+----------------+-------------------------+--------------------+



In [57]:
rfm = df[["master_id", "last_order_date_new", "order_num_total", "customer_value_total"]]
rfm.limit(5).toPandas()

Unnamed: 0,master_id,last_order_date_new,order_num_total,customer_value_total
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,2019-02-23,1.0,212.98
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,2019-12-01,1.0,199.98
2,602897a6-cdac-11ea-b31f-000d3a38a36f,2020-07-24,1.0,140.49
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,2018-12-31,1.0,174.99
4,80664354-adf0-11eb-8f64-000d3a299ebf,2021-05-05,2.0,283.95


In [59]:
last_order_date = rfm.agg({"last_order_date_new":"max"}).collect()[0][0]
print(last_order_date)

2021-05-30


In [62]:
from pyspark.sql.functions import expr
rfm = rfm.withColumn("Recency", expr("datediff('2023-8-1', last_order_date_new)"))
rfm.show(5)

+--------------------+-------------------+---------------+--------------------+-------+
|           master_id|last_order_date_new|order_num_total|customer_value_total|Recency|
+--------------------+-------------------+---------------+--------------------+-------+
|b3ace094-a17f-11e...|         2019-02-23|            1.0|              212.98|   1620|
|c57d7c4c-a950-11e...|         2019-12-01|            1.0|              199.98|   1339|
|602897a6-cdac-11e...|         2020-07-24|            1.0|              140.49|   1103|
|388e4c4e-af86-11e...|         2018-12-31|            1.0|              174.99|   1674|
|80664354-adf0-11e...|         2021-05-05|            2.0|              283.95|    818|
+--------------------+-------------------+---------------+--------------------+-------+
only showing top 5 rows



In [64]:
rfm = rfm.withColumnRenamed('order_num_total', 'Frequency')
rfm = rfm.withColumnRenamed('customer_value_total', 'Monetary')
rfm.show(5)

+--------------------+-------------------+---------+--------+-------+
|           master_id|last_order_date_new|Frequency|Monetary|Recency|
+--------------------+-------------------+---------+--------+-------+
|b3ace094-a17f-11e...|         2019-02-23|      1.0|  212.98|   1620|
|c57d7c4c-a950-11e...|         2019-12-01|      1.0|  199.98|   1339|
|602897a6-cdac-11e...|         2020-07-24|      1.0|  140.49|   1103|
|388e4c4e-af86-11e...|         2018-12-31|      1.0|  174.99|   1674|
|80664354-adf0-11e...|         2021-05-05|      2.0|  283.95|    818|
+--------------------+-------------------+---------+--------+-------+
only showing top 5 rows



In [65]:
rfm = rfm.drop("last_order_date_new")
rfm.show(5)

+--------------------+---------+--------+-------+
|           master_id|Frequency|Monetary|Recency|
+--------------------+---------+--------+-------+
|b3ace094-a17f-11e...|      1.0|  212.98|   1620|
|c57d7c4c-a950-11e...|      1.0|  199.98|   1339|
|602897a6-cdac-11e...|      1.0|  140.49|   1103|
|388e4c4e-af86-11e...|      1.0|  174.99|   1674|
|80664354-adf0-11e...|      2.0|  283.95|    818|
+--------------------+---------+--------+-------+
only showing top 5 rows



In [66]:
from pyspark.ml.feature import VectorAssembler

rfm_col = ["Frequency", "Monetary", "Recency"]

assembler = VectorAssembler() \
.setHandleInvalid("skip") \
.setInputCols(rfm_col) \
.setOutputCol("unscaled_features")

In [67]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("features")

In [68]:
from pyspark.ml import Pipeline

pipeline_obj = Pipeline().setStages([assembler, scaler])

In [76]:
pipeline_model = pipeline_obj.fit(rfm)
transformed_df = pipeline_model.transform(rfm)

In [77]:
from pyspark.ml.clustering import KMeans
def compute_kmeans_model(df, k):
    kmeansObject = KMeans() \
    .setSeed(142) \
    .setK(k)

    return kmeansObject.fit(df)

In [79]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator()

for k in range(2,11):
    kmeans_model = compute_kmeans_model(transformed_df, k)

    transformed = kmeans_model.transform(transformed_df)

    score = evaluator.evaluate(transformed)
    
    score_list.append(score)

    print("k: {}, score: {}".format(k, score))

k: 2, score: 0.4403139929816383
k: 3, score: 0.6550334881045901
k: 4, score: 0.6658979549823084
k: 5, score: 0.6777415637605636
k: 6, score: 0.5104147694092941
k: 7, score: 0.6780801245958821
k: 8, score: 0.6494848988252783
k: 9, score: 0.5039729078362091
k: 10, score: 0.5132605636800075


In [80]:
evaluator.getMetricName()

'silhouette'

In [82]:
kmeans = (KMeans().setSeed(142).setK(5))

In [83]:
kmeans_model = kmeans.fit(transformed_df)

In [84]:
transformed = kmeans_model.transform(transformed_df)

In [86]:
from pyspark.sql.functions import sum, count, desc, mean, count

transformed.groupBy("prediction") \
    .agg(count("Monetary").alias("count"), \
        mean("Monetary").alias("avg_monetary"), \
        mean("Recency").alias("avg_recency"), \
        mean("Frequency").alias("avg_frequency")) \
    .sort(desc("avg_monetary")) \
.show()

+----------+-----+------------------+------------------+------------------+
|prediction|count|      avg_monetary|       avg_recency|     avg_frequency|
+----------+-----+------------------+------------------+------------------+
|         1|    3|          37469.96|1327.6666666666667| 306.3333333333333|
|         3|   17|13068.847058823529| 960.0588235294117| 91.23529411764706|
|         4| 4072|1544.8937328094296|  980.840373280943| 9.520383104125736|
|         0|41794| 307.9166727281978| 964.9902856869408|2.0373020050724984|
|         2|54114|179.52782311424278|1406.5692427098347| 1.445928964778061|
+----------+-----+------------------+------------------+------------------+

