In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark RFM example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [2]:
df_raw = spark.read.format('com.databricks.spark.csv').\
                       options(header='true', \
                       inferschema='true').\
            load("data.csv",header=True);

In [3]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

DataFrame[key: string, value: string]

In [4]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


In [5]:
df_raw.show(5)
df_raw.printSchema()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows

root
 |-- InvoiceNo: string (nullable = true)
 |

In [6]:
from pyspark.sql.functions import count

def my_count(df_in):
    df_in.agg( *[ count(c).alias(c) for c in df_in.columns ] ).show()

In [7]:
my_count(df_raw)

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|   541909|   541909|     540455|  541909|     541909|   541909|    406829| 541909|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [8]:
df = df_raw.dropna(how='any')
my_count(df)

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|   406829|   406829|     406829|  406829|     406829|   406829|    406829| 406829|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [9]:
from pyspark.sql.functions import to_utc_timestamp, unix_timestamp, lit, datediff, col

timeFmt = "MM/dd/yy HH:mm"

df = df.withColumn('NewInvoiceDate'
                 , to_utc_timestamp(unix_timestamp(col('InvoiceDate'),timeFmt).cast('timestamp')
                 , 'UTC'))


In [10]:
from pyspark.sql.functions import round

df = df.withColumn('TotalPrice', round( df.Quantity * df.UnitPrice, 2 ) )

In [11]:
from pyspark.sql.functions import mean, min, max, sum, datediff, to_date

date_max = df.select(max('NewInvoiceDate')).toPandas()
current = to_utc_timestamp( unix_timestamp(lit(str(date_max.iloc[0][0])), \
                              'yy-MM-dd HH:mm').cast('timestamp'), 'UTC' )

df = df.withColumn('Duration', datediff(lit(current), 'NewInvoiceDate'))

  PyArrow >= 1.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


In [12]:
recency = df.groupBy('CustomerID').agg(min('Duration').alias('Recency'))
frequency = df.groupBy('CustomerID', 'InvoiceNo').count()\
                        .groupBy('CustomerID')\
                        .agg(count("*").alias("Frequency"))
monetary = df.groupBy('CustomerID').agg(round(sum('TotalPrice'), 2).alias('Monetary'))
rfm = recency.join(frequency,'CustomerID', how = 'inner')\
             .join(monetary,'CustomerID', how = 'inner')


In [13]:
rfm.show(5)

+----------+-------+---------+--------+
|CustomerID|Recency|Frequency|Monetary|
+----------+-------+---------+--------+
|     15619|     10|        1|   336.4|
|     17389|      0|       43|31300.08|
|     12940|     46|        4|  876.29|
|     13623|     30|        7|  672.44|
|     14450|    180|        3|  483.25|
+----------+-------+---------+--------+
only showing top 5 rows



In [14]:
import numpy as np
import pandas as pd

In [15]:
def describe_pd(df_in, columns, deciles=False):

    if deciles:
        percentiles = np.array(range(0, 110, 10))
    else:
        percentiles = [25, 50, 75]

    percs = np.transpose([np.percentile(df_in.select(x).collect(), percentiles) for x in columns])
    percs = pd.DataFrame(percs, columns=columns)
    percs['summary'] = [str(p) + '%' for p in percentiles]

    spark_describe = df_in.describe().toPandas()
    new_df = pd.concat([spark_describe, percs],ignore_index=True)
    new_df = new_df.round(2)
    return new_df[['summary'] + columns]

In [16]:
cols = ['Recency','Frequency','Monetary']
describe_pd(rfm,cols,1)

Unnamed: 0,summary,Recency,Frequency,Monetary
0,count,4372.0,4372.0,4372.0
1,mean,91.58119853613908,5.07548032936871,1898.4597003659655
2,stddev,100.77213931384831,9.338754163574729,8219.345141139722
3,min,0.0,1.0,-4287.63
4,max,373.0,248.0,279489.02
5,0%,0.0,1.0,-4287.63
6,10%,4.0,1.0,146.022
7,20%,11.0,1.0,234.392
8,30%,21.0,1.0,337.37
9,40%,31.0,2.0,465.412


In [17]:
def RScore(x):
    if  x <= 16:
        return 1
    elif x<= 50:
        return 2
    elif x<= 143:
        return 3
    else:
        return 4

def FScore(x):
    if  x <= 1:
        return 4
    elif x <= 3:
        return 3
    elif x <= 5:
        return 2
    else:
        return 1

def MScore(x):
    if  x <= 293:
        return 4
    elif x <= 648:
        return 3
    elif x <= 1611:
        return 2
    else:
        return 1

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType

R_udf = udf(lambda x: RScore(x), StringType())
F_udf = udf(lambda x: FScore(x), StringType())
M_udf = udf(lambda x: MScore(x), StringType())

In [18]:
rfm_seg = rfm.withColumn("r_seg", R_udf("Recency"))
rfm_seg = rfm_seg.withColumn("f_seg", F_udf("Frequency"))
rfm_seg = rfm_seg.withColumn("m_seg", M_udf("Monetary"))
rfm_seg.show(5)

+----------+-------+---------+--------+-----+-----+-----+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|
+----------+-------+---------+--------+-----+-----+-----+
|     15619|     10|        1|   336.4|    1|    4|    3|
|     17389|      0|       43|31300.08|    1|    1|    1|
|     12940|     46|        4|  876.29|    2|    2|    2|
|     13623|     30|        7|  672.44|    2|    1|    2|
|     14450|    180|        3|  483.25|    4|    3|    3|
+----------+-------+---------+--------+-----+-----+-----+
only showing top 5 rows



In [19]:
from pyspark.sql import functions as F

In [20]:
rfm_seg = rfm_seg.withColumn('RFMScore',
                             F.concat(F.col('r_seg'),F.col('f_seg'), F.col('m_seg')))
rfm_seg.sort(F.col('RFMScore')).show(5)

+----------+-------+---------+--------+-----+-----+-----+--------+
|CustomerID|Recency|Frequency|Monetary|r_seg|f_seg|m_seg|RFMScore|
+----------+-------+---------+--------+-----+-----+-----+--------+
|     12471|      2|       49|18740.92|    1|    1|    1|     111|
|     18161|     10|        6| 1612.79|    1|    1|    1|     111|
|     17809|     16|       15| 4627.62|    1|    1|    1|     111|
|     17754|      0|        6| 1739.92|    1|    1|    1|     111|
|     15382|     14|        8| 5927.86|    1|    1|    1|     111|
+----------+-------+---------+--------+-----+-----+-----+--------+
only showing top 5 rows



In [21]:
rfm_seg.groupBy('RFMScore')\
       .agg({'Recency':'mean',
             'Frequency': 'mean',
             'Monetary': 'mean'} )\
        .sort(F.col('RFMScore')).show(5)

+--------+-----------------+------------------+------------------+
|RFMScore|     avg(Recency)|     avg(Monetary)|    avg(Frequency)|
+--------+-----------------+------------------+------------------+
|     111|6.035123966942149| 8828.888595041324|18.882231404958677|
|     112|7.237113402061856|1223.3604123711339| 7.752577319587629|
|     113|              8.0|          505.9775|               7.5|
|     114|             11.0|            191.17|               8.0|
|     121|6.472727272727273|2569.0619999999994| 4.636363636363637|
+--------+-----------------+------------------+------------------+
only showing top 5 rows



In [22]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors


def transData(data):
    return data.rdd.map(lambda r: [r[0],Vectors.dense(r[1:])]).toDF(['CustomerID','rfm'])

In [23]:
transformed= transData(rfm)
transformed.show(5)

+----------+-------------------+
|CustomerID|                rfm|
+----------+-------------------+
|     15619|   [10.0,1.0,336.4]|
|     17389|[0.0,43.0,31300.08]|
|     12940|  [46.0,4.0,876.29]|
|     13623|  [30.0,7.0,672.44]|
|     14450| [180.0,3.0,483.25]|
+----------+-------------------+
only showing top 5 rows



In [24]:
from pyspark.ml.feature import MinMaxScaler

scaler = MinMaxScaler(inputCol="rfm",\
         outputCol="features")
scalerModel =  scaler.fit(transformed)
scaledData = scalerModel.transform(transformed)
scaledData.show(5,False)

+----------+-------------------+---------------------------------------------------------------+
|CustomerID|rfm                |features                                                       |
+----------+-------------------+---------------------------------------------------------------+
|15619     |[10.0,1.0,336.4]   |[0.02680965147453083,0.0,0.016294610567853272]                 |
|17389     |[0.0,43.0,31300.08]|[0.0,0.1700404858299595,0.12540746393334334]                   |
|12940     |[46.0,4.0,876.29]  |[0.12332439678284182,0.012145748987854251,0.01819712791732512] |
|13623     |[30.0,7.0,672.44]  |[0.08042895442359249,0.024291497975708502,0.017478781288030567]|
|14450     |[180.0,3.0,483.25] |[0.48257372654155495,0.008097165991902834,0.016812095004997765]|
+----------+-------------------+---------------------------------------------------------------+
only showing top 5 rows



In [25]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col, percent_rank, lit
from pyspark.sql.window import Window
from pyspark.sql import DataFrame, Row
from pyspark.sql.types import StructType
from functools import reduce  

from pyspark.ml.clustering import KMeans

from pyspark.ml.evaluation import ClusteringEvaluator
silhouette_score=[]
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='features', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')
for i in range(2,20):
    
    KMeans_algo=KMeans(featuresCol='features', k=i)
    
    KMeans_fit=KMeans_algo.fit(scaledData)
    
    output=KMeans_fit.transform(scaledData)
    
    
    
    score=evaluator.evaluate(output)
    
    silhouette_score.append(score)
    
    print("Silhouette Score:",score)

Silhouette Score: 0.8646368936683079
Silhouette Score: 0.8045154385557955
Silhouette Score: 0.7008699963021162
Silhouette Score: 0.6716569820272631
Silhouette Score: 0.6179340289777181
Silhouette Score: 0.44971691376104345
Silhouette Score: 0.6299608212562088
Silhouette Score: 0.6590769573879091
Silhouette Score: 0.6591924486884344
Silhouette Score: 0.6457686374293314
Silhouette Score: 0.6658159824942368
Silhouette Score: 0.6410023270519241
Silhouette Score: 0.6501056841336909
Silhouette Score: 0.6315474100955457
Silhouette Score: 0.6208600367405319
Silhouette Score: 0.6412770695206362
Silhouette Score: 0.6383045017805672
Silhouette Score: 0.6197672019086612


In [25]:
k = 5
kmeans = KMeans().setK(k).setSeed(1)
model = kmeans.fit(scaledData)
# Make predictions
predictions = model.transform(scaledData)
predictions.show(5,False)

NameError: name 'KMeans' is not defined

In [77]:
results = rfm.join(predictions.select('CustomerID','prediction'),'CustomerID',how='left')
results.show(5)

+----------+-------+---------+--------+----------+
|CustomerID|Recency|Frequency|Monetary|prediction|
+----------+-------+---------+--------+----------+
|     13098|      1|       41|28658.88|         1|
|     13248|    124|        2|  465.68|         4|
|     13452|    259|        2|   590.0|         0|
|     13460|     29|        2|  183.44|         1|
|     13518|     85|        1|  659.44|         3|
+----------+-------+---------+--------+----------+
only showing top 5 rows



In [78]:
results.groupBy('prediction')\
       .agg({'Recency':'mean',
             'Frequency': 'mean',
             'Monetary': 'mean'} )\
        .sort(F.col('prediction')).show(5)

+----------+------------------+------------------+------------------+
|prediction|      avg(Recency)|     avg(Monetary)|    avg(Frequency)|
+----------+------------------+------------------+------------------+
|         0|243.48979591836735|494.17213151927456|1.7052154195011338|
|         1|16.183703703703703|3231.2944000000007| 8.018765432098766|
|         2| 333.9367088607595| 324.9553797468355|1.5411392405063291|
|         3|  68.6654028436019|1012.9810805687206| 3.177251184834123|
|         4|153.78691588785045| 686.6986915887854|2.5439252336448597|
+----------+------------------+------------------+------------------+

