<a href="https://colab.research.google.com/github/DiogoMondin/rfv-analisys/blob/main/T%26D_RFV.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

import matplotlib.pyplot as plt

# Table

In [None]:
customer_dataset = "/content/customer_dataset_csv.txt"

In [None]:
spark = (
    SparkSession.builder
    .appName("RFV Analysis")
    .getOrCreate()
)

In [None]:
customer_df = (
    spark.read
    .option("header", "true")
    .option("inferschema", "true")
    .csv(customer_dataset)
)

# Data Exploring

In [None]:
customer_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- favorite_category: string (nullable = true)
 |-- acquisition_channel: string (nullable = true)
 |-- first_purchase_date: string (nullable = true)
 |-- total_orders: integer (nullable = true)
 |-- total_spent: integer (nullable = true)
 |-- last_purchase_date: string (nullable = true)
 |-- returns_count: integer (nullable = true)
 |-- avg_order_value: integer (nullable = true)
 |-- days_since_last_purchase: integer (nullable = true)
 |-- customer_lifetime_months: integer (nullable = true)



In [None]:
customer_df.describe().show()

+-------+-----------+-----------------+-------------------+-------------------+-----------------+------------------+------------------+------------------+-----------------+------------------------+------------------------+
|summary|customer_id|favorite_category|acquisition_channel|first_purchase_date|     total_orders|       total_spent|last_purchase_date|     returns_count|  avg_order_value|days_since_last_purchase|customer_lifetime_months|
+-------+-----------+-----------------+-------------------+-------------------+-----------------+------------------+------------------+------------------+-----------------+------------------------+------------------------+
|  count|         35|               35|                 35|                 35|               35|                35|                35|                35|               35|                      35|                      35|
|   mean|       NULL|             NULL|               NULL|               NULL|10.82857142857143|7965.514285

In [None]:
customer_df.show()

+-----------+-----------------+-------------------+-------------------+------------+-----------+------------------+-------------+---------------+------------------------+------------------------+
|customer_id|favorite_category|acquisition_channel|first_purchase_date|total_orders|total_spent|last_purchase_date|returns_count|avg_order_value|days_since_last_purchase|customer_lifetime_months|
+-----------+-----------------+-------------------+-------------------+------------+-----------+------------------+-------------+---------------+------------------------+------------------------+
|   CUST0001|          Tablets|             Google|         16/11/2022|          10|      17302|        25/03/2024|            0|           1730|                     124|                      20|
|   CUST0002|            Games|             Google|         18/04/2023|          21|      18160|        21/07/2024|            0|            865|                       6|                      15|
|   CUST0003|       

In [None]:
def add_rfv_cluster(df, col_name, new_col_name, reverse=False):
    df = df.withColumn(col_name, F.col(col_name).cast("int"))

    q1, q2, q3, q4 = df.approxQuantile(col_name, [0.2, 0.4, 0.6, 0.8], 0)

    if reverse:
        df = df.withColumn(
            new_col_name,
            F.when(F.col(col_name) <= q1, 1)
             .when(F.col(col_name) <= q2, 2)
             .when(F.col(col_name) <= q3, 3)
             .when(F.col(col_name) <= q4, 4)
             .otherwise(5)
        )
    else:
        df = df.withColumn(
            new_col_name,
            F.when(F.col(col_name) <= q1, 5)
             .when(F.col(col_name) <= q2, 4)
             .when(F.col(col_name) <= q3, 3)
             .when(F.col(col_name) <= q4, 2)
             .otherwise(1)
        )

    return df

In [None]:
# days_since_last_purchase → recency cluster
customer_df = add_rfv_cluster(customer_df, "days_since_last_purchase", "recency_cluster", reverse=False)

# total_orders → frequency cluster
customer_df = add_rfv_cluster(customer_df, "total_orders", "frequency_cluster", reverse=True)

# total_spent → value cluster
customer_df = add_rfv_cluster(customer_df, "total_spent", "value_cluster", reverse=True)

# all clusters

customer_df = (
    customer_df
    .withColumn("rfv_cluster", F.concat(F.col("recency_cluster"), F.col("frequency_cluster"), F.col("value_cluster")))
)

In [None]:
customer_df.show()

+-----------+-----------------+-------------------+-------------------+------------+-----------+------------------+-------------+---------------+------------------------+------------------------+---------------+-----------------+-------------+-----------+
|customer_id|favorite_category|acquisition_channel|first_purchase_date|total_orders|total_spent|last_purchase_date|returns_count|avg_order_value|days_since_last_purchase|customer_lifetime_months|recency_cluster|frequency_cluster|value_cluster|rfv_cluster|
+-----------+-----------------+-------------------+-------------------+------------+-----------+------------------+-------------+---------------+------------------------+------------------------+---------------+-----------------+-------------+-----------+
|   CUST0001|          Tablets|             Google|         16/11/2022|          10|      17302|        25/03/2024|            0|           1730|                     124|                      20|              2|                3|   

In [None]:
(
    customer_df
    .groupBy("recency_cluster")
    .agg(
        F.round(F.avg(F.col("days_since_last_purchase")), 2).alias("avg_days_since_last_purchase")
    )
    .orderBy("recency_cluster")
    .show()
)

+---------------+----------------------------+
|recency_cluster|avg_days_since_last_purchase|
+---------------+----------------------------+
|              1|                      256.29|
|              2|                       130.0|
|              3|                       55.43|
|              4|                       15.71|
|              5|                        3.57|
+---------------+----------------------------+



In [None]:
(
    customer_df
    .groupBy("frequency_cluster")
    .agg(
        F.round(F.avg(F.col("total_orders")), 2).alias("avg_total_orders")
    )
    .orderBy("frequency_cluster")
    .show()
)

+-----------------+----------------+
|frequency_cluster|avg_total_orders|
+-----------------+----------------+
|                1|             2.5|
|                2|             5.5|
|                3|             8.0|
|                4|           14.29|
|                5|           24.29|
+-----------------+----------------+



In [None]:
(
    customer_df
    .groupBy("value_cluster")
    .agg(
        F.round(F.avg(F.col("total_spent")), 2).alias("avg_total_spent")
    )
    .orderBy("value_cluster")
    .show()
)

+-------------+---------------+
|value_cluster|avg_total_spent|
+-------------+---------------+
|            1|        1311.14|
|            2|        3516.57|
|            3|        6424.71|
|            4|       10903.14|
|            5|        17672.0|
+-------------+---------------+



In [None]:
customer_clusters_df = (
    customer_df
    .withColumn(
        "tag",
        F.when(
            (F.col("days_since_last_purchase") <= 30) &
            (F.col("total_orders") >= 8) &
            (F.col("total_spent") >= 5000),
            "champions"
        ).when(
            (F.col("days_since_last_purchase") > 90) &
            (F.col("total_orders") >= 3) &
            (F.col("total_spent") >= 1000),
            "at risk"
        ).when(
            (F.col("days_since_last_purchase") <= 60) &
            (F.col("total_orders") <= 2),
            "new clients"
        ).otherwise("loyal")
    )
    .withColumn(
        "rfv_score",
        (F.col("recency_cluster") * 0.3) + (F.col("frequency_cluster") * 0.3) + (F.col("value_cluster") * 0.4)
    )
)

In [None]:
(
    customer_clusters_df
    .select(
        "days_since_last_purchase",
        "total_orders",
        "total_spent",
        "recency_cluster", "frequency_cluster", "value_cluster",
        "rfv_score",
        "tag"
    )
    .show()
)

+------------------------+------------+-----------+---------------+-----------------+-------------+------------------+-----------+
|days_since_last_purchase|total_orders|total_spent|recency_cluster|frequency_cluster|value_cluster|         rfv_score|        tag|
+------------------------+------------+-----------+---------------+-----------------+-------------+------------------+-----------+
|                     124|          10|      17302|              2|                3|            5|               3.5|    at risk|
|                       6|          21|      18160|              5|                5|            5|               5.0|  champions|
|                      88|          16|       7309|              3|                4|            3|               3.3|      loyal|
|                     101|           7|       7844|              2|                3|            4|               3.1|    at risk|
|                     323|           5|       1075|              1|                

In [None]:
(
    customer_clusters_df
    .groupBy("tag")
    .agg(
        F.round(F.avg(F.col("rfv_score")), 2).alias("avg_rfv_score")
    )
    .orderBy("avg_rfv_score")
    .show()
)

+-----------+-------------+
|        tag|avg_rfv_score|
+-----------+-------------+
|new clients|         1.75|
|    at risk|         2.17|
|      loyal|         2.74|
|  champions|         4.57|
+-----------+-------------+

