In [0]:
from pyspark.sql import SparkSession 

spark = SparkSession \
    .builder \
    .appName("RFM Customer Segmentation with PySpark") \
    .getOrCreate()

In [0]:
df_raw = spark.read.format('delta').\
    options(header = 'true', inferschema = 'true').\
    load("/user/hive/warehouse/online_retail2", header = True)

In [0]:
df_raw.show(5)
df_raw.printSchema()

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2,55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2,75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows

root
 |-- InvoiceNo: string (nullable =

In [0]:
from pyspark.sql.functions import count

def my_count(df_in):
    df_in.agg(*[count(c).alias(c) for c in df_in.columns]).show()

my_count(df_raw)

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|   541909|   541909|     541909|  541909|     541909|   541909|    541909| 541909|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [0]:
df = df_raw.dropna(how = "any")
my_count(df)

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|   541909|   541909|     541909|  541909|     541909|   541909|    541909| 541909|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [0]:
from pyspark.sql.functions import to_utc_timestamp, unix_timestamp, lit, datediff, col, to_timestamp

df = df.withColumn('NewInvoiceDate', to_timestamp("InvoiceDate","d.M.yyyy HH:mm"))

df.show(5)


+---------+---------+--------------------+--------+---------------+---------+----------+--------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|     NewInvoiceDate|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2,55|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2,75|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029G|KNITTED UNION FLA...|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|1.12.2010 08:26|     3,39|     17850|United Kingdom|2010-12-01 08:26:00|
+-------

In [0]:
from pyspark.sql.functions import round

from pyspark.sql import functions as F

df = df.withColumn("UnitPrice", F.regexp_replace("UnitPrice", ",", ".").cast("double"))

df = df.withColumn('TotalPrice', round(df.Quantity * df.UnitPrice, 2) )

df.show()

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+-------------------+----------+--------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|     NewInvoiceDate|TotalPrice|Duration|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+-------------------+----------+--------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1.12.2010 08:26|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|      15.3|     373|
|   536365|    71053| WHITE METAL LANTERN|       6|1.12.2010 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|     20.34|     373|
|   536365|   84406B|CREAM CUPID HEART...|       8|1.12.2010 08:26|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|      22.0|     373|
|   536365|   84029G|KNITTED UNION FLA...|       6|1.12.2010 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|     20.34|  

In [0]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

from pyspark.sql.functions import mean, min, max, sum, datediff, to_date

date_max = df.select(max('NewInvoiceDate')).toPandas()

current = to_utc_timestamp(unix_timestamp(lit(str(date_max.iloc[0][0])), 'yy-MM-dd HH:mm').cast('timestamp'), 'UTC')

df = df.withColumn('Duration', datediff(lit(current), 'NewInvoiceDate'))

#Recency, Frequency, Monetary

recency = df.groupBy('CustomerID').agg(min('Duration').alias('Recency'))

frequency = df.groupBy('CustomerID', 'InvoiceNo').count()\
    .groupBy('CustomerID')\
    .agg(count('*').alias("Frequency"))

monetary = df.groupBy('CustomerID').agg(round(sum('TotalPrice'), 2).alias('Monetary'))

rfm = recency.join(frequency, 'CustomerID', how = 'inner')\
    .join(monetary, 'CustomerID', how = 'inner')

rfm.show(100)
df.show()

+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
|     15194|      3|       22| 7521.17|
|     17703|     35|        3|  798.74|
|     13452|    259|        2|   590.0|
|     13098|      1|       41|28658.88|
|     17048|    115|        6|  864.32|
|     13638|     15|        1|  122.64|
|     15322|     64|        2|  602.97|
|     13723|    217|        1|  199.85|
|     16597|      4|        1|   90.04|
|     15237|      1|        4| 1412.32|
|     13248|    124|        2|  465.68|
|     16742|     46|        2|     0.0|
|     14719|      1|        6| 1592.18|
|     17043|     32|        4| 1735.18|
|     14117|    143|        1|    90.0|
|     15057|    275|        2|  1489.5|
|     17979|     35|        5|  737.81|
|     13460|     29|        2|  183.44|
|     13518|     85|        1|  659.44|
|     15432|     23|        1|  171.19|
|     18196|     95|        2|  689.13|
|     15437|    262|        1|  200.16|


In [0]:
import numpy as np
import pandas as pd

def describe_pd(df_input, columns, deciles = False):
    if deciles:
        percentiles = [25, 50, 75]

    pcs = np.transpose([np.percentile(df_input.select(x).collect(),percentiles) for x in columns])
    pcs = pd.DataFrame(pcs, columns = columns)
    pcs['summary'] = [str(p) + "%" for p in percentiles]
    mydescribe = df_input.describe().toPandas()
    new_df = pd.concat([mydescribe, pcs], ignore_index = True)
    new_df = new_df.round(2)
    return new_df[['summary'] + columns]

In [0]:
cols = ['Recency', 'Frequency', 'Monetary']
describe_pd(rfm, cols, 1)

Unnamed: 0,summary,Recency,Frequency,Monetary
0,count,4373.0,4373.0,4373.0
1,mean,91.56025611708208,5.922707523439287,2229.0756757374743
2,stddev,100.7701307562583,56.79881324857276,23356.82678007453
3,min,0.0,1.0,-4287.63
4,max,373.0,3710.0,1447682.12
5,25%,16.0,1.0,293.45
6,50%,50.0,3.0,648.41
7,75%,143.0,5.0,1612.13


In [0]:
def RScore(x):
    if x <= 16:
        return 1
    elif x<= 50:
        return 2
    elif x <= 143:
        return 3
    else:
        return 4

def FScore(x):
    if x <= 1:
        return 4
    elif x <= 3:
        return 3
    elif x <= 5:
        return 2
    else:
        return 1
    
def MScore(x):
    if x <= 293:
        return 4
    elif x <= 648:
        return 3
    elif x <= 1612:
        return 2
    else:
        return 1
    
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType

R_udf = udf(lambda x : RScore(x), StringType())
F_udf = udf(lambda x : FScore(x), StringType())
M_udf = udf(lambda x : MScore(x), StringType())

In [0]:
rfm_seg = rfm.withColumn("r_seg", R_udf("Recency"))
rfm_seg = rfm_seg.withColumn("f_seg", F_udf("Frequency"))
rfm_seg = rfm_seg.withColumn("m_seg", M_udf("Monetary"))
rfm_seg = rfm_seg.withColumn("RFMScore", F.concat(F.col('r_seg'), F.col('f_seg'), F.col('m_seg')))

rfm_seg.sort(F.col('RFMScore')).show(20)

+----------+-------+---------+----------+-----+-----+-----+--------+
|CustomerID|Recency|Frequency|  Monetary|r_seg|f_seg|m_seg|RFMScore|
+----------+-------+---------+----------+-----+-----+-----+--------+
|         0|      0|     3710|1447682.12|    1|    1|    1|     111|
|     13004|     11|       22|   5613.43|    1|    1|    1|     111|
|     17602|      2|        8|   5050.77|    1|    1|    1|     111|
|     13098|      1|       41|  28658.88|    1|    1|    1|     111|
|     13924|      1|       11|   1682.08|    1|    1|    1|     111|
|     13658|      9|        7|   2421.47|    1|    1|    1|     111|
|     15061|      3|       55|  54228.74|    1|    1|    1|     111|
|     15838|     11|       21|  33350.76|    1|    1|    1|     111|
|     15194|      3|       22|   7521.17|    1|    1|    1|     111|
|     14415|      1|       18|   5811.56|    1|    1|    1|     111|
|     13798|      1|       63|  36351.42|    1|    1|    1|     111|
|     15993|      8|       10|   2

In [0]:
rfm_seg.groupBy('RFMScore').agg({'Recency': 'mean', 'Frequency' : 'mean', 'Monetary' : 'mean'}).sort(F.col('RFMScore')).show(5)

+--------+-----------------+------------------+------------------+
|RFMScore|     avg(Recency)|     avg(Monetary)|    avg(Frequency)|
+--------+-----------------+------------------+------------------+
|     111|6.022680412371134|11795.596288659783|26.492783505154637|
|     112|7.237113402061856|1223.3604123711343| 7.752577319587629|
|     113|              8.0|505.97749999999996|               7.5|
|     114|             11.0|            191.17|               8.0|
|     121|6.472727272727273|2569.0619999999994| 4.636363636363637|
+--------+-----------------+------------------+------------------+
only showing top 5 rows



In [0]:
#Detailed summary

grp = 'RFMScore'
num_cols = ['Recency', 'Frequency', 'Monetary']
df_myinput = rfm_seg

quantile_grouped = quantile_agg(df_myinput, grp, num_cols)
quantile_grouped.toPandas().to_csv(output_dir + 'quantile_grouped.csv')

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-1781779312752536>:5[0m
[1;32m      3[0m grp [38;5;241m=[39m [38;5;124m'[39m[38;5;124mRFMScore[39m[38;5;124m'[39m
[1;32m      4[0m num_cols [38;5;241m=[39m [[38;5;124m'[39m[38;5;124mRecency[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124mFrequency[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124mMonetary[39m[38;5;124m'[39m]
[0;32m----> 5[0m rfm_seg[38;5;241m.[39mtoPandas()[38;5;241m.[39mto_csv(output_dir [38;5;241m+[39m [38;5;124m'[39m[38;5;124mquantile_grouped.csv[39m[38;5;124m'[39m)

[0;31mNameError[0m: name 'output_dir' is not defined