# Unüberwachtes Lernen und Kundensegmentierung mit Spark

## Import the library and open a Spark session

In [1]:
import findspark

In [2]:
findspark.init("/opt/manual/spark")

In [3]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd
import warnings

warnings.filterwarnings("ignore", category=FutureWarning)

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [4]:
spark = SparkSession.builder \
.appName("Clustering_KMeans") \
.master("local[*]") \
.config("spark.sql.shuffle.partitions", "2") \
.getOrCreate()

## Read Data

In [5]:
df = spark.read.format("csv") \
.option("header", True) \
.option("delimiter", "|") \
.option("inferSchema", True) \
.load("/user/train/datasets/flo100k1.csv")

In [6]:
df.persist()

DataFrame[master_id: string, order_channel: string, platform_type: string, last_order_channel: string, first_order_date: timestamp, last_order_date: timestamp, last_order_date_online: timestamp, last_order_date_offline: timestamp, order_num_total_ever_online: double, order_num_total_ever_offline: double, customer_value_total_ever_offline: double, customer_value_total_ever_online: double, interested_in_categories_12: string, online_product_group_amount_top_name_12: string, offline_product_group_name_12: string, last_order_date_new: string, store_type: string]

## Data Exploration

In [7]:
df.limit(5).toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,online_product_group_amount_top_name_12,offline_product_group_name_12,last_order_date_new,store_type
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,,1.0,212.98,0.0,,,,2019-02-23,A
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,,1.0,199.98,0.0,,,,2019-12-01,A
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,,1.0,140.49,0.0,[ERKEK],,ERKEK,2020-07-24,A
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,,0.0,174.99,,,,2018-12-31,A
4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,,0.0,283.95,[],,,2021-05-05,A


In [8]:
df.printSchema()

root
 |-- master_id: string (nullable = true)
 |-- order_channel: string (nullable = true)
 |-- platform_type: string (nullable = true)
 |-- last_order_channel: string (nullable = true)
 |-- first_order_date: timestamp (nullable = true)
 |-- last_order_date: timestamp (nullable = true)
 |-- last_order_date_online: timestamp (nullable = true)
 |-- last_order_date_offline: timestamp (nullable = true)
 |-- order_num_total_ever_online: double (nullable = true)
 |-- order_num_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_offline: double (nullable = true)
 |-- customer_value_total_ever_online: double (nullable = true)
 |-- interested_in_categories_12: string (nullable = true)
 |-- online_product_group_amount_top_name_12: string (nullable = true)
 |-- offline_product_group_name_12: string (nullable = true)
 |-- last_order_date_new: string (nullable = true)
 |-- store_type: string (nullable = true)



last_order_date_new: Wird als String angezeigt, sollte jedoch ein Timestamp sein. Da diese Variable für die weitere Analyse nicht relevant ist, kann sie so bleiben.

In [9]:
df_count = df.count()
print(f"Anzahl der gesamten Beobachtungen:", df_count)

Anzahl der gesamten Beobachtungen: 100000


In [10]:
num_variables = len(df.columns)
print(f"Anzahl der Variablen: {num_variables}")

Anzahl der Variablen: 17


## Missing value check

In [11]:
df.select("interested_in_categories_12").show()

+---------------------------+
|interested_in_categories_12|
+---------------------------+
|                       null|
|                       null|
|                    [ERKEK]|
|                       null|
|                         []|
|                       null|
|                [AKTIFSPOR]|
|         [KADIN, AKTIFSPOR]|
|             [ERKEK, KADIN]|
|                         []|
|                    [KADIN]|
|                       null|
|                       null|
|                       null|
|        [AKTIFCOCUK, KADIN]|
|                       null|
|                         []|
|                [AKTIFSPOR]|
|                       null|
|                       null|
+---------------------------+
only showing top 20 rows



In [12]:
def null_count(df, col_name):
    nc = df.select(col_name).filter(
        (F.col(col_name) == "NaN") |
        (F.col(col_name) == "NaT") |
        (F.col(col_name) == "[]") |
        (F.col(col_name) == "") |
        (F.col(col_name).isNull())
                        ).count()
    return nc

In [13]:
for col_name in df.dtypes:
    nc = null_count(df, col_name[0])
    
    if nc > 0:
        print("{} has {} null values, or {:.2f}%.".format(col_name[0], nc, (nc/df_count)*100))

last_order_date_online has 70784 null values, or 70.78%.
last_order_date_offline has 21703 null values, or 21.70%.
order_num_total_ever_online has 70784 null values, or 70.78%.
order_num_total_ever_offline has 21703 null values, or 21.70%.
interested_in_categories_12 has 62371 null values, or 62.37%.
online_product_group_amount_top_name_12 has 88295 null values, or 88.30%.
offline_product_group_name_12 has 77209 null values, or 77.21%.


## Data Analysis

Ist die Variable `master_id` unique?:

In [14]:
unique_count = df.select("master_id").distinct().count()

if unique_count == df_count:
    print ("master_id is unique")
else: 
    print("master_id is not unique")

master_id is unique


## Data Unterstanding

Beobachtungen werden nach `platform_type` und `order_channel` gezählt und sortiert.:

In [15]:
df.groupBy("platform_type", "order_channel").count().orderBy(F.desc("count")).show()

+-------------+-------------+-----+
|platform_type|order_channel|count|
+-------------+-------------+-----+
|      Offline|      Offline|65991|
|       Online|  Android App| 8728|
|       Online|       Mobile| 6451|
|  OmniChannel|      Offline| 4793|
|  OmniChannel|  Android App| 3261|
|       Online|      Desktop| 3253|
|       Online|      Ios App| 3008|
|  OmniChannel|       Mobile| 2061|
|  OmniChannel|      Desktop| 1498|
|  OmniChannel|      Ios App|  956|
+-------------+-------------+-----+



## Create new columns

Omnichannel-Kunden nutzen sowohl Online- als auch Offline-Plattformen für ihre Einkäufe.

Der Datentyp der Spalten `order_num_total_ever_online` und `order_num_total_ever_offline` wird von `double` auf `integer` geändert, um NaN-Werte bei der Erstellung der neuen Variable `NEW_order_num_total` zu vermeiden.

In [16]:
df.select("order_num_total_ever_online", "order_num_total_ever_offline").printSchema()

root
 |-- order_num_total_ever_online: double (nullable = true)
 |-- order_num_total_ever_offline: double (nullable = true)



In [17]:
df1 = df.withColumn("order_num_total_ever_online", F.when(F.col("order_num_total_ever_online").isNull(), 0).otherwise(F.col("order_num_total_ever_online"))) \
          .withColumn("order_num_total_ever_online", F.col("order_num_total_ever_online").cast("int")) \
         .withColumn("order_num_total_ever_offline", F.when(F.col("order_num_total_ever_offline").isNull(), 0).otherwise(F.col("order_num_total_ever_offline"))) \
          .withColumn("order_num_total_ever_offline", F.col("order_num_total_ever_offline").cast("int")) 

In [18]:
# Kontrolle:

df1.select("order_num_total_ever_online", "order_num_total_ever_offline").printSchema()

root
 |-- order_num_total_ever_online: integer (nullable = true)
 |-- order_num_total_ever_offline: integer (nullable = true)



In [19]:
df1 = df.na.fill(value=0,
                subset=["order_num_total_ever_online", "order_num_total_ever_offline"])

**Alternative: Fehlende Werte durch den Mittelwert ersetzen.**

Zunächst werden die Mittelwerte für jede betroffene Spalte berechnet:

```python
from pyspark.sql.functions import expr

df.selectExpr("AVG(order_num_total_ever_offline)").show()
df.selectExpr("AVG(order_num_total_ever_online)").show()
df.selectExpr("AVG(customer_value_total_ever_offline)").show()
df.selectExpr("AVG(customer_value_total_ever_online)").show()
```

Anschließend werden die berechneten Mittelwerte für die fehlenden Werte verwendet:

```python
df1 = df.withColumn(
    "customer_value_total_ever_online",
    F.when(F.col("customer_value_total_ever_online") == "NA", 120).otherwise(F.col("customer_value_total_ever_online"))
).withColumn(
    "customer_value_total_ever_offline",
    F.when(F.col("customer_value_total_ever_offline") == "NA", 172).otherwise(F.col("customer_value_total_ever_offline"))
).withColumn(
    "order_num_total_ever_online",
    F.when(F.col("order_num_total_ever_online") == "NA", 3).otherwise(F.col("order_num_total_ever_online"))
).withColumn(
    "order_num_total_ever_offline",
    F.when(F.col("order_num_total_ever_offline") == "NA", 2).otherwise(F.col("order_num_total_ever_offline"))
)
```
   
   
   
   

In [20]:
df1.limit(5).toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,online_product_group_amount_top_name_12,offline_product_group_name_12,last_order_date_new,store_type
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,0.0,1.0,212.98,0.0,,,,2019-02-23,A
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,0.0,1.0,199.98,0.0,,,,2019-12-01,A
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,0.0,1.0,140.49,0.0,[ERKEK],,ERKEK,2020-07-24,A
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,0.0,0.0,174.99,,,,2018-12-31,A
4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,0.0,0.0,283.95,[],,,2021-05-05,A


Die Spalten `order_num_total_ever_online` und `order_num_total_ever_offline` wurden erneut automatisch als `double` formatiert.

In [21]:
# Neue Variable erstellen:

df1 = df1.withColumn('NEW_order_num_total',
                   df1.order_num_total_ever_online + df1.order_num_total_ever_offline)\
        .withColumn('NEW_customer_value_total', 
                   df1.customer_value_total_ever_offline + df1.customer_value_total_ever_online)

In [22]:
df1.limit(5).toPandas()

Unnamed: 0,master_id,order_channel,platform_type,last_order_channel,first_order_date,last_order_date,last_order_date_online,last_order_date_offline,order_num_total_ever_online,order_num_total_ever_offline,customer_value_total_ever_offline,customer_value_total_ever_online,interested_in_categories_12,online_product_group_amount_top_name_12,offline_product_group_name_12,last_order_date_new,store_type,NEW_order_num_total,NEW_customer_value_total
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,Offline,Offline,Offline,2019-02-23 12:59:17,2019-02-23 12:59:17,NaT,2019-02-23 12:59:17,0.0,1.0,212.98,0.0,,,,2019-02-23,A,1.0,212.98
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,Offline,OmniChannel,Offline,2019-12-01 16:48:09,2019-12-01 16:48:09,NaT,2019-12-01 16:48:09,0.0,1.0,199.98,0.0,,,,2019-12-01,A,1.0,199.98
2,602897a6-cdac-11ea-b31f-000d3a38a36f,Offline,Offline,Offline,2020-07-24 15:49:47,2020-07-24 15:49:47,NaT,2020-07-24 15:49:47,0.0,1.0,140.49,0.0,[ERKEK],,ERKEK,2020-07-24,A,1.0,140.49
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,Mobile,Online,Mobile,2018-12-31 07:22:07,2018-12-31 07:22:07,2018-12-31 07:22:07,NaT,1.0,0.0,0.0,174.99,,,,2018-12-31,A,1.0,174.99
4,80664354-adf0-11eb-8f64-000d3a299ebf,Desktop,Online,Desktop,2021-05-05 21:07:02,2021-05-05 22:39:36,2021-05-05 22:39:36,NaT,2.0,0.0,0.0,283.95,[],,,2021-05-05,A,2.0,283.95


Kontrolle: 

Es dürfen keine Nullwerte in den Spalten `order_num_total_ever_online`, `order_num_total_ever_offline`, `customer_value_total_ever_offline`, `customer_value_total_ever_online`, `NEW_order_num_total` und `NEW_customer_value_total` auftreten.

In [23]:
for col_name in df1.dtypes:
    nc = null_count(df1, col_name[0])
    
    if nc > 0:
        print("{} has {} null values, or {:.2f}%.".format(col_name[0], nc, (nc/df_count)*100))

last_order_date_online has 70784 null values, or 70.78%.
last_order_date_offline has 21703 null values, or 21.70%.
interested_in_categories_12 has 62371 null values, or 62.37%.
online_product_group_amount_top_name_12 has 88295 null values, or 88.30%.
offline_product_group_name_12 has 77209 null values, or 77.21%.


## Analysis `order_channel`

Die Verteilung der Gesamtzahl der Kunden, der durchschnittlich erhaltenen Produkte und der Einnahmen nach Kanälen (`order_channel`) wird betrachtet.

In [24]:
df1.groupby("order_channel").agg(
    F.count("master_id").alias("count(master_id)"),
    F.mean("NEW_order_num_total").alias("avg_NEW_order_num_total"),
    F.mean("NEW_customer_value_total").alias("avg_NEW_customer_value_total"),
    F.sum("NEW_customer_value_total").alias("sum_NEW_customer_value_total")
).show()

+-------------+----------------+-----------------------+----------------------------+----------------------------+
|order_channel|count(master_id)|avg_NEW_order_num_total|avg_NEW_customer_value_total|sum_NEW_customer_value_total|
+-------------+----------------+-----------------------+----------------------------+----------------------------+
|      Offline|           70784|     1.6003475361663653|           218.1639484912399|        1.5442516930003924E7|
|      Ios App|            3964|      3.377648839556004|           568.4640312815313|            2253391.41999999|
|       Mobile|            8512|      2.798637218045113|           391.7418761748067|          3334506.8499999545|
|      Desktop|            4751|      2.538623447695222|          376.91355504104155|          1790716.2999999884|
|  Android App|           11989|       3.50971724080407|           532.8462840937418|            6388294.09999987|
+-------------+----------------+-----------------------+------------------------

## Analysis `platform_type`

Die Verteilung der Gesamtzahl der Kunden, der durchschnittlich erhaltenen Produkte und der Einnahmen auf den verschiedenen Plattformen wird betrachtet.

In [25]:
df1.groupby("platform_type").agg(
    F.count("master_id").alias("count(master_id)"),
    F.mean("NEW_order_num_total").alias("avg_NEW_order_num_total"),
    F.mean("NEW_customer_value_total").alias("avg_NEW_customer_value_total"),
    F.sum("NEW_customer_value_total").alias("sum_NEW_customer_value_total")
).show()

+-------------+----------------+-----------------------+----------------------------+----------------------------+
|platform_type|count(master_id)|avg_NEW_order_num_total|avg_NEW_customer_value_total|sum_NEW_customer_value_total|
+-------------+----------------+-----------------------+----------------------------+----------------------------+
|      Offline|           65991|     1.5837917291751906|          215.73030686007337|        1.4236258680003101E7|
|  OmniChannel|           12569|     3.4947092051873656|           500.7937512928511|           6294476.659999846|
|       Online|           21440|     2.6207089552238805|           404.7896576492577|           8678690.260000085|
+-------------+----------------+-----------------------+----------------------------+----------------------------+



## Calculation of K-Means Metrics

### RFM

In [26]:
rfm = df1.select(["master_id", "last_order_date_new", "NEW_order_num_total", "NEW_customer_value_total"])

In [27]:
rfm.limit(5).toPandas()

Unnamed: 0,master_id,last_order_date_new,NEW_order_num_total,NEW_customer_value_total
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,2019-02-23,1.0,212.98
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,2019-12-01,1.0,199.98
2,602897a6-cdac-11ea-b31f-000d3a38a36f,2020-07-24,1.0,140.49
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,2018-12-31,1.0,174.99
4,80664354-adf0-11eb-8f64-000d3a299ebf,2021-05-05,2.0,283.95


In [28]:
from pyspark.sql.functions import max

In [29]:
last_order_date = df1.select(max("last_order_date")).collect()[0][0]

In [30]:
print("Das Datum des letzten Kaufs ist:", last_order_date)

Das Datum des letzten Kaufs ist: 2021-05-30 23:07:54


### Recency

Die `Recency`-Spalte wird berechnet, indem die Anzahl der Tage zwischen `last_order_date_new` und `last_date_order` ermittelt und um 2 Tage erhöht wird.

In [31]:
from pyspark.sql.functions import expr

In [32]:
rfm1 = rfm.withColumn("Recency", F.expr(f"datediff('{last_order_date}', last_order_date_new) + 2"))

rfm1.limit(5).toPandas()

Unnamed: 0,master_id,last_order_date_new,NEW_order_num_total,NEW_customer_value_total,Recency
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,2019-02-23,1.0,212.98,829
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,2019-12-01,1.0,199.98,548
2,602897a6-cdac-11ea-b31f-000d3a38a36f,2020-07-24,1.0,140.49,312
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,2018-12-31,1.0,174.99,883
4,80664354-adf0-11eb-8f64-000d3a299ebf,2021-05-05,2.0,283.95,27


## Frequency and Monetary

In [33]:
rfm2 = (rfm1.withColumnRenamed("NEW_order_num_total", "Frequency") 
      .withColumnRenamed("NEW_customer_value_total", "Monetary")
      .drop("last_order_date_new"))

In [34]:
rfm2.limit(5).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,829
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,548
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,312
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,883
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,27


## Model Preparation

### Vector Assembler

Da rfm_cols = [`Frequency`, `Monetary`, `Recency`] bereits numerische Variablen sind, sind OneHotEncoder und StringIndexer nicht nötig. Es kann direkt der VectorAssembler verwendet werden.

In [35]:
rfm_cols = rfm2.columns[1:]

In [36]:
from pyspark.ml.feature import VectorAssembler

In [37]:
assembler =  VectorAssembler() \
              .setHandleInvalid("skip") \
              .setInputCols(rfm_cols) \
              .setOutputCol("unscaled_features")

### Feature Scaler

In [38]:
from pyspark.ml.feature import StandardScaler

In [39]:
scaler = StandardScaler() \
.setInputCol("unscaled_features") \
.setOutputCol("features")

### Pipeline

In [40]:
from pyspark.ml import Pipeline

In [41]:
pipeline_obj = Pipeline().setStages([assembler, scaler])

In [42]:
pipeline_model = pipeline_obj.fit(rfm2)

In [43]:
pipeline_df = pipeline_model.transform(rfm2)

In [44]:
pipeline_df.limit(5).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency,unscaled_features,features
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,829,"[1.0, 212.98, 829.0]","[0.32770120750468895, 0.4598810576445158, 3.24..."
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,548,"[1.0, 199.98, 548.0]","[0.32770120750468895, 0.43181056393910355, 2.1..."
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,312,"[1.0, 140.49, 312.0]","[0.32770120750468895, 0.30335566620564386, 1.2..."
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,883,"[1.0, 174.99, 883.0]","[0.32770120750468895, 0.37785043796231493, 3.4..."
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,27,"[2.0, 283.95, 27.0]","[0.6554024150093779, 0.6131243605886011, 0.105..."


## K-Means clustering model

Die optimale Anzahl an Clustern ermitteln.:

In [45]:
from pyspark.ml.clustering import KMeans

In [46]:
def compute_kmeans_model(df, k):
    kmeansObject = KMeans() \
    .setSeed(142) \
    .setK(k)
    
    return kmeansObject.fit(df)

### Evaluator

In [47]:
from pyspark.ml.evaluation import ClusteringEvaluator

In [48]:
evaluator = ClusteringEvaluator()

In [49]:
for k in range(2,11):
    kmeans_model = compute_kmeans_model(pipeline_df, k)
    
    transformed_df = kmeans_model.transform(pipeline_df)
    
    score = evaluator.evaluate(transformed_df)
    
    print("k: {}, score: {}".format(k, score))

k: 2, score: 0.9995403313147956
k: 3, score: 0.5306747271448232
k: 4, score: 0.6705260499487317
k: 5, score: 0.6839227157177052
k: 6, score: 0.6561488711873166
k: 7, score: 0.5487474862203171
k: 8, score: 0.5154402600591739
k: 9, score: 0.5690643226674137
k: 10, score: 0.55055801154429


## Prediction

In [50]:
kmeans_model = compute_kmeans_model(pipeline_df, 5)

In [51]:
transformed_df = kmeans_model.transform(pipeline_df)

In [52]:
transformed_df.limit(100).toPandas()

Unnamed: 0,master_id,Frequency,Monetary,Recency,unscaled_features,features,prediction
0,b3ace094-a17f-11e9-a2fc-000d3a38a36f,1.0,212.98,829,"[1.0, 212.98, 829.0]","[0.32770120750468895, 0.4598810576445158, 3.24...",0
1,c57d7c4c-a950-11e9-a2fc-000d3a38a36f,1.0,199.98,548,"[1.0, 199.98, 548.0]","[0.32770120750468895, 0.43181056393910355, 2.1...",0
2,602897a6-cdac-11ea-b31f-000d3a38a36f,1.0,140.49,312,"[1.0, 140.49, 312.0]","[0.32770120750468895, 0.30335566620564386, 1.2...",4
3,388e4c4e-af86-11e9-a2fc-000d3a38a36f,1.0,174.99,883,"[1.0, 174.99, 883.0]","[0.32770120750468895, 0.37785043796231493, 3.4...",0
4,80664354-adf0-11eb-8f64-000d3a299ebf,2.0,283.95,27,"[2.0, 283.95, 27.0]","[0.6554024150093779, 0.6131243605886011, 0.105...",4
5,47511f36-aeb4-11e9-a2fc-000d3a38a36f,1.0,139.98,933,"[1.0, 139.98, 933.0]","[0.32770120750468895, 0.30225443914489303, 3.6...",0
6,77f7c318-3407-11eb-9a15-000d3a38a36f,1.0,269.9,182,"[1.0, 269.9, 182.0]","[0.32770120750468895, 0.5827866346992902, 0.71...",4
7,399d6dd2-ecf1-11ea-9369-000d3a38a36f,1.0,95.73,272,"[1.0, 95.73, 272.0]","[0.32770120750468895, 0.20670679710916284, 1.0...",4
8,b3d4a6f2-a368-11e9-a2fc-000d3a38a36f,3.0,207.94,250,"[3.0, 207.94, 250.0]","[0.9831036225140668, 0.44899834316180215, 0.97...",4
9,42fdccd4-f1d1-11ea-bde9-000d3a38a36f,1.0,134.95,226,"[1.0, 134.95, 226.0]","[0.32770120750468895, 0.2913933173496451, 0.88...",4


## Descriptive Statistic

In [53]:
from pyspark.sql.functions import sum, col, desc, mean, count

In [54]:
transformed_df.groupBy("prediction") \
  .agg(count("Monetary").alias('count'), \
       mean("Monetary").alias("avg_monetary"), \
       mean("Recency").alias("avg_recency"), \
       mean("Frequency").alias("avg_frequency")) \
  .sort(desc("avg_monetary")) \
  .show()

+----------+-----+------------------+------------------+------------------+
|prediction|count|      avg_monetary|       avg_recency|     avg_frequency|
+----------+-----+------------------+------------------+------------------+
|         3|    6| 31003.60666666667|             343.5|226.83333333333334|
|         2|  607|3211.2418945634286|185.09060955518945| 20.12685337726524|
|         1| 7973| 994.4762122162238|192.21020945691708| 6.118901291860028|
|         4|38165|255.77974950872283| 178.1563474387528| 1.742879601729333|
|         0|53249|176.21715243480713|  618.113729835302|1.4225243666547729|
+----------+-----+------------------+------------------+------------------+

