In [1]:
# ! hdfs dfs -put /home/train/Downloads/flo100k.csv/ /user/train/datasets/flo100k.csv

In [2]:
! hdfs dfs -ls /user/train/datasets

Found 6 items
-rw-r--r--   1 train supergroup       4556 2020-09-23 20:56 /user/train/datasets/Advertising.csv
-rw-r--r--   1 train supergroup     674856 2024-06-23 15:11 /user/train/datasets/Churn_Modelling.csv
drwxr-xr-x   - train supergroup          0 2020-11-19 21:02 /user/train/datasets/churn-telecom
drwxr-xr-x   - train supergroup          0 2020-11-21 11:16 /user/train/datasets/retail_db
-rw-r--r--   1 train supergroup   19589922 2024-06-24 09:42 /user/train/datasets/tflo100k.csv
-rw-r--r--   1 train supergroup     460676 2024-06-21 19:41 /user/train/datasets/train.csv


In [3]:
import findspark

In [4]:
findspark.init("/opt/manual/spark")

In [5]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd

In [6]:
spark = (
    SparkSession.builder
       .appName("Flo Clustering")
       .master("yarn")
       .config("spark.sql.shuffle.partitions", "2")
       .getOrCreate()
)

In [7]:
df = spark.read.format("csv").option("delimiter", '|').option("header", True).option("inferSchema", True).load("/user/train/datasets/tflo100k.csv")

In [8]:
df.persist()

DataFrame[master_id: string, order_channel: string, platform_type: string, last_order_channel: string, first_order_date: timestamp, last_order_date: timestamp, last_order_date_online: timestamp, last_order_date_offline: timestamp, order_num_total_ever_online: double, order_num_total_ever_offline: double, customer_value_total_ever_offline: double, customer_value_total_ever_online: double, interested_in_categories_12: string, online_product_group_amount_top_name_12: string, offline_product_group_name_12: string, last_order_date_new: string, store_type: string]

In [9]:
df.limit(5).toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,online_product_group_amount_top_name_12,offline_product_group_name_12,last_order_date_new,store_type
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,,1.0,212.98,0.0,,,,2019-02-23,A
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,,1.0,199.98,0.0,,,,2019-12-01,A
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,,1.0,140.49,0.0,[ERKEK],,ERKEK,2020-07-24,A
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,,0.0,174.99,,,,2018-12-31,A
4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,,0.0,283.95,[],,,2021-05-05,A


In [10]:
df_count = df.count()

print(df_count)

100000


In [11]:
len(df.columns)

17

In [12]:
df.printSchema()   # df.dtypes

root
 |-- master_id: string (nullable = true)
 |-- order_channel: string (nullable = true)
 |-- platform_type: string (nullable = true)
 |-- last_order_channel: string (nullable = true)
 |-- first_order_date: timestamp (nullable = true)
 |-- last_order_date: timestamp (nullable = true)
 |-- last_order_date_online: timestamp (nullable = true)
 |-- last_order_date_offline: timestamp (nullable = true)
 |-- order_num_total_ever_online: double (nullable = true)
 |-- order_num_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_online: double (nullable = true)
 |-- interested_in_categories_12: string (nullable = true)
 |-- online_product_group_amount_top_name_12: string (nullable = true)
 |-- offline_product_group_name_12: string (nullable = true)
 |-- last_order_date_new: string (nullable = true)
 |-- store_type: string (nullable = true)



# NULL CHECK

In [13]:
def null_count(df, col_name):
    nc = df.select(col_name).filter(
        (F.col(col_name).isNull()) | 
        (F.col(col_name)=='') |
        (F.col(col_name)=='NA')
    ).count()
    
    return nc

In [14]:
for col in df.dtypes:
    nc = null_count(df, col[0])
    
    if nc > 0:
        print("{} has {} % {}".format(col[0], nc, (nc / df_count *100)))

last_order_date_online has 70784 % 70.784
last_order_date_offline has 21703 % 21.703
order_num_total_ever_online has 70784 % 70.784
order_num_total_ever_offline has 21703 % 21.703
interested_in_categories_12 has 56590 % 56.589999999999996
online_product_group_amount_top_name_12 has 88295 % 88.295
offline_product_group_name_12 has 77209 % 77.209


# DATA ANALYSIS

In [15]:
# We can say that the master_id variable is unique() when the number of uniques of master_id is equal to the total number of rows of the data set.

from pyspark.sql.functions import countDistinct

df.select(countDistinct("master_id")).show()



+-------------------------+
|count(DISTINCT master_id)|
+-------------------------+
|                   100000|
+-------------------------+



In [16]:
df.groupBy(["platform_type", "order_channel"]).count().show()

+-------------+-------------+-----+
|platform_type|order_channel|count|
+-------------+-------------+-----+
|  OmniChannel|      Offline| 4793|
|       Online|      Ios App| 3008|
|       Online|       Mobile| 6451|
|      Offline|      Offline|65991|
|       Online|  Android App| 8728|
|  OmniChannel|  Android App| 3261|
|       Online|      Desktop| 3253|
|  OmniChannel|      Ios App|  956|
|  OmniChannel|      Desktop| 1498|
|  OmniChannel|       Mobile| 2061|
+-------------+-------------+-----+



In [17]:
# NOTE: Before creating new variables, we need to convert the null values of these variables to zero and make sure that all values are greater than or equal to zero.

# fill 'NA' with zero(0)

df = df.na.fill(value = 0, subset = ["order_num_total_ever_online", "order_num_total_ever_offline", "customer_value_total_ever_offline", "customer_value_total_ever_online"])

df = df.filter((df.order_num_total_ever_online >= 0) & (df.order_num_total_ever_offline >= 0) & (df.customer_value_total_ever_offline >= 0)
              & (df.customer_value_total_ever_online >= 0))

In [18]:
# Omnichannel refers to customers shopping both online and offline platforms.

# 'withColumn()' either creates a new variable or assigns it to an existing column.

# creating 'order_num_total' variable: amount of total shopping

df = df.withColumn("order_num_total", df.order_num_total_ever_online + df.order_num_total_ever_offline)


# creating  'customer_value_total' variable: refers to amount of total money

df = df.withColumn("customer_value_total", df.customer_value_total_ever_online + df.customer_value_total_ever_offline)

In [19]:
# Total number of customers, average products and earnings distribution across order channels:

df.groupBy("order_channel").agg({"master_id": "count", "order_num_total": "avg", "customer_value_total":"avg"}).show()


+-------------+----------------+-------------------------+--------------------+
|order_channel|count(master_id)|avg(customer_value_total)|avg(order_num_total)|
+-------------+----------------+-------------------------+--------------------+
|      Offline|           70784|       218.16394849129728|  1.6003475361663653|
|      Ios App|            3964|        568.4640312815283|   3.377648839556004|
|       Mobile|            8512|        391.7418761748012|   2.798637218045113|
|  Android App|           11989|        532.8462840937639|    3.50971724080407|
|      Desktop|            4751|       376.91355504103785|   2.538623447695222|
+-------------+----------------+-------------------------+--------------------+



In [20]:
# Total number of customers, average products and earnings distribution across platform_type:

df.groupBy("platform_type").agg({"master_id": "count", "order_num_total": "avg", "customer_value_total":"avg"}).show()

+-------------+----------------+-------------------------+--------------------+
|platform_type|count(master_id)|avg(customer_value_total)|avg(order_num_total)|
+-------------+----------------+-------------------------+--------------------+
|      Offline|           65991|        215.7303068601296|  1.5837917291751906|
|  OmniChannel|           12569|        500.7937512928769|  3.4947092051873656|
|       Online|           21440|        404.7896576492903|  2.6207089552238805|
+-------------+----------------+-------------------------+--------------------+



# Recency

In [21]:
rfm = df[["master_id", "last_order_date_new", "order_num_total", "customer_value_total"]]

rfm.show(5)

+--------------------+-------------------+---------------+--------------------+
|           master_id|last_order_date_new|order_num_total|customer_value_total|
+--------------------+-------------------+---------------+--------------------+
|b3ace094-a17f-11e...|         2019-02-23|            1.0|              212.98|
|c57d7c4c-a950-11e...|         2019-12-01|            1.0|              199.98|
|602897a6-cdac-11e...|         2020-07-24|            1.0|              140.49|
|388e4c4e-af86-11e...|         2018-12-31|            1.0|              174.99|
|80664354-adf0-11e...|         2021-05-05|            2.0|              283.95|
+--------------------+-------------------+---------------+--------------------+
only showing top 5 rows



In [22]:
# last order date

last_order_date = df.agg({"last_order_date":"max"}).collect()[0][0]

last_order_date.date()

datetime.date(2021, 5, 30)

In [24]:
# Let's calculate each customer's last purchase time in days,as  
# two days after the last purchase date, and store it in a variable called "Recency".

from pyspark.sql.functions import expr

rfm = rfm.withColumn("Recency", expr("datediff('2021-6-1', last_order_date_new)"))

rfm.show(5)

+--------------------+-------------------+---------------+--------------------+-------+
|           master_id|last_order_date_new|order_num_total|customer_value_total|Recency|
+--------------------+-------------------+---------------+--------------------+-------+
|b3ace094-a17f-11e...|         2019-02-23|            1.0|              212.98|    829|
|c57d7c4c-a950-11e...|         2019-12-01|            1.0|              199.98|    548|
|602897a6-cdac-11e...|         2020-07-24|            1.0|              140.49|    312|
|388e4c4e-af86-11e...|         2018-12-31|            1.0|              174.99|    883|
|80664354-adf0-11e...|         2021-05-05|            2.0|              283.95|     27|
+--------------------+-------------------+---------------+--------------------+-------+
only showing top 5 rows



In [25]:
# When we want to change the name, we use the 'withColumnRenamed' function.

# order_num_total: Frequency
# customer_value_total: Monetary

rfm = (rfm.withColumnRenamed("order_num_total", "Frequency").withColumnRenamed("customer_value_total", "Monetary"))

rfm.show(5)

+--------------------+-------------------+---------+--------+-------+
|           master_id|last_order_date_new|Frequency|Monetary|Recency|
+--------------------+-------------------+---------+--------+-------+
|b3ace094-a17f-11e...|         2019-02-23|      1.0|  212.98|    829|
|c57d7c4c-a950-11e...|         2019-12-01|      1.0|  199.98|    548|
|602897a6-cdac-11e...|         2020-07-24|      1.0|  140.49|    312|
|388e4c4e-af86-11e...|         2018-12-31|      1.0|  174.99|    883|
|80664354-adf0-11e...|         2021-05-05|      2.0|  283.95|     27|
+--------------------+-------------------+---------+--------+-------+
only showing top 5 rows



In [26]:
# Removing unnecessary columns ("last_order_date_new")

rfm = rfm.drop("last_order_date_new")

rfm.show(5)

+--------------------+---------+--------+-------+
|           master_id|Frequency|Monetary|Recency|
+--------------------+---------+--------+-------+
|b3ace094-a17f-11e...|      1.0|  212.98|    829|
|c57d7c4c-a950-11e...|      1.0|  199.98|    548|
|602897a6-cdac-11e...|      1.0|  140.49|    312|
|388e4c4e-af86-11e...|      1.0|  174.99|    883|
|80664354-adf0-11e...|      2.0|  283.95|     27|
+--------------------+---------+--------+-------+
only showing top 5 rows



# Creating Model

## Vector Assembler

In [27]:
rfm_col = ["Recency", "Frequency", "Monetary"]

In [28]:
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler() \
            .setHandleInvalid("skip") \
            .setInputCols(rfm_col) \
            .setOutputCol("unscaled_features"))

## Standard Scaler

In [29]:
from pyspark.ml.feature import StandardScaler

scaler =  StandardScaler() \
         .setInputCol("unscaled_features") \
         .setOutputCol("features")

## Pipeline

In [31]:
from pyspark.ml import Pipeline

pipeline_obj = Pipeline().setStages([assembler, scaler])

In [32]:
pipeline_model = pipeline_obj.fit(rfm)

transformed_df = pipeline_model.transform(rfm)

transformed_df.show(5)

+--------------------+---------+--------+-------+------------------+--------------------+
|           master_id|Frequency|Monetary|Recency| unscaled_features|            features|
+--------------------+---------+--------+-------+------------------+--------------------+
|b3ace094-a17f-11e...|      1.0|  212.98|    829|[829.0,1.0,212.98]|[3.24793811935259...|
|c57d7c4c-a950-11e...|      1.0|  199.98|    548|[548.0,1.0,199.98]|[2.14700855175539...|
|602897a6-cdac-11e...|      1.0|  140.49|    312|[312.0,1.0,140.49]|[1.22238443092642...|
|388e4c4e-af86-11e...|      1.0|  174.99|    883|[883.0,1.0,174.99]|[3.45950465547447...|
|80664354-adf0-11e...|      2.0|  283.95|     27| [27.0,2.0,283.95]|[0.10578326806094...|
+--------------------+---------+--------+-------+------------------+--------------------+
only showing top 5 rows



## Finding Optimum Cluster Numbers

In [41]:
from pyspark.ml.clustering import KMeans

def compute_KMeans_Model(df, k):
    kmeansObject = KMeans() \
    .setSeed(142) \
    .setK(k)
    
    return kmeansObject.fit(df)

## Evaluate Model

In [42]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator()
score_list = []

for k in range(2,11):
    kmeans_model = compute_KMeans_Model(transformed_df, k)
    
    transformed = kmeans_model.transform(transformed_df)
    
    score = evaluator.evaluate(transformed)
    score_list.append(score)
    
    print(" k: {},  score:{}".format(k, score))

 k: 2,  score:0.4403139929816374
 k: 3,  score:0.655033488104594
 k: 4,  score:0.6658979549823151
 k: 5,  score:0.6777415637605754
 k: 6,  score:0.5104147694093096
 k: 7,  score:0.6780801245958924
 k: 8,  score:0.6494848988252858
 k: 9,  score:0.5039729078361186
 k: 10,  score:0.5132605636800474


# Final Model

In [43]:
# Remodeling with optimum value

kmeans = ( KMeans()\
           .setSeed(142) \
           .setK(5))

In [44]:
kmeans_model = kmeans.fit(transformed_df)

In [45]:
transformed = kmeans_model.transform(transformed_df)

transformed.show()

+--------------------+---------+------------------+-------+--------------------+--------------------+----------+
|           master_id|Frequency|          Monetary|Recency|   unscaled_features|            features|prediction|
+--------------------+---------+------------------+-------+--------------------+--------------------+----------+
|b3ace094-a17f-11e...|      1.0|            212.98|    829|  [829.0,1.0,212.98]|[3.24793811935259...|         2|
|c57d7c4c-a950-11e...|      1.0|            199.98|    548|  [548.0,1.0,199.98]|[2.14700855175539...|         2|
|602897a6-cdac-11e...|      1.0|            140.49|    312|  [312.0,1.0,140.49]|[1.22238443092642...|         0|
|388e4c4e-af86-11e...|      1.0|            174.99|    883|  [883.0,1.0,174.99]|[3.45950465547447...|         2|
|80664354-adf0-11e...|      2.0|            283.95|     27|   [27.0,2.0,283.95]|[0.10578326806094...|         0|
|47511f36-aeb4-11e...|      1.0|            139.98|    933|  [933.0,1.0,139.98]|[3.6553995963280

# Descriptive Statistics

In [52]:
from pyspark.sql.functions import sum, desc, mean, count, col

transformed.groupBy("prediction") \
    .agg(
        count("Monetary").alias("count"),
        mean("Monetary").alias("avg_monetary"),
        mean("Recency").alias("avg_recency"),
        mean("Frequency").alias("avg_frequency")
    ) \
    .orderBy(desc("avg_monetary")) \
    .show()

    

+----------+-----+------------------+------------------+------------------+
|prediction|count|      avg_monetary|       avg_recency|     avg_frequency|
+----------+-----+------------------+------------------+------------------+
|         1|    3|          37469.96| 536.6666666666666| 306.3333333333333|
|         3|   17|13068.847058823529|169.05882352941177| 91.23529411764706|
|         4| 4072|1544.8937328094296|189.84037328094303| 9.520383104125736|
|         0|41794| 307.9166727281978|173.99028568694072|2.0373020050724984|
|         2|54114|179.52782311424278| 615.5692427098348| 1.445928964778061|
+----------+-----+------------------+------------------+------------------+

