In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Retail_RFM_Analysis") \
    .getOrCreate()

spark

## LOAD THE DATASET

In [3]:
retail = spark.read.csv(
    "retail_cleaned.csv",
    header=True,
    inferSchema=True
)

retail.show(5)
retail.printSchema()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|TotalPrice|       InvoiceMonth|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|      15.3|2010-12-01 00:00:00|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|     20.34|2010-12-01 00:00:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|      22.0|2010-12-01 00:00:00|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|     20.34|2010-12-01 00:00:00|
|   536365|   84029E|RED WO

## DATA MANAGEMENT
### (remove invalid records)

In [4]:
retail = retail.filter(
    (retail.Quantity > 0) &
    (retail.UnitPrice > 0) &
    (retail.CustomerID.isNotNull())
)

## CREATE SQL VIEW

In [5]:
retail.createOrReplaceTempView("retail")

## Feature Engineering
### calculate Total Amount per Transaction

In [6]:
spark.sql("""
SELECT
    *,
    Quantity * UnitPrice AS Total_Amount
FROM retail
""").createOrReplaceTempView("retail_sales")

In [7]:
rfm = spark.sql("""
SELECT
    CustomerID,
    DATEDIFF(
        (SELECT MAX(InvoiceDate) FROM retail_sales),
        MAX(InvoiceDate)
    ) AS Recency,
    COUNT(DISTINCT InvoiceNo) AS Frequency,
    ROUND(SUM(Total_Amount),2) AS Monetary
FROM retail_sales
GROUP BY CustomerID
""")

rfm.show(5)

+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
|     15574|    177|        4|  702.25|
|     15555|     12|       16| 4805.17|
|     16250|    261|        2|  389.44|
|     15271|      7|       15| 2507.07|
|     17686|      7|        7| 5739.46|
+----------+-------+---------+--------+
only showing top 5 rows



## RFM SCORING (1-5 SCALE)

In [10]:
rfm.createOrReplaceTempView("rfm")


In [20]:
rfm_scores = spark.sql("""
SELECT
     CustomerID,
     Recency,
     Frequency,
     Monetary,
     NTILE(5) OVER (ORDER BY Recency DESC) AS R_Score,
     NTILE(5) OVER (ORDER BY Frequency) AS F_Score,
     NTILE(5) OVER (ORDER BY Monetary) AS M_Score
FROM rfm
""")

rfm_scores.show(5)

+----------+-------+---------+--------+-------+-------+-------+
|CustomerID|Recency|Frequency|Monetary|R_Score|F_Score|M_Score|
+----------+-------+---------+--------+-------+-------+-------+
|     16738|    297|        1|    3.75|      1|      1|      1|
|     14792|     63|        1|     6.2|      3|      2|      1|
|     16454|     44|        2|     6.9|      3|      3|      1|
|     17956|    249|        1|   12.75|      1|      1|      1|
|     16878|     84|        1|    13.3|      2|      2|      1|
+----------+-------+---------+--------+-------+-------+-------+
only showing top 5 rows



## CUSTOMER SEGMENTATION

In [21]:
rfm_scores.createOrReplaceTempView("rfm_scores")


In [22]:
rfm_segmented = spark.sql("""
SELECT *,
CASE
    WHEN R_Score >= 4 AND F_Score >= 4 AND M_Score >= 4 THEN 'Champions'
    WHEN R_Score >= 3 AND F_Score >= 3 THEN 'Loyal Customers'
    WHEN R_Score <= 2 AND F_Score >= 3 THEN 'At Risk'
    ELSE 'Hibernating'
END AS Customer_Segment
FROM rfm_scores
""")

rfm_segmented.show(10)


+----------+-------+---------+--------+-------+-------+-------+----------------+
|CustomerID|Recency|Frequency|Monetary|R_Score|F_Score|M_Score|Customer_Segment|
+----------+-------+---------+--------+-------+-------+-------+----------------+
|     16738|    297|        1|    3.75|      1|      1|      1|     Hibernating|
|     14792|     63|        1|     6.2|      3|      2|      1|     Hibernating|
|     16454|     44|        2|     6.9|      3|      3|      1| Loyal Customers|
|     17956|    249|        1|   12.75|      1|      1|      1|     Hibernating|
|     16878|     84|        1|    13.3|      2|      2|      1|     Hibernating|
|     15823|    372|        1|    15.0|      1|      1|      1|     Hibernating|
|     17763|    263|        1|    15.0|      1|      1|      1|     Hibernating|
|     13307|    120|        1|    15.0|      2|      1|      1|     Hibernating|
|     16093|    106|        1|    17.0|      2|      1|      1|     Hibernating|
|     17986|     56|        

## SAVE OUTPUT FOR POWER BI

In [23]:
rfm_segmented.toPandas().to_csv(
    "rfm_customer_segments.csv",
    index=False
)