In [23]:
import numpy as np
import pandas as pd
import matplotlib
# %%local
# %matplotlib inline
import matplotlib.pyplot as plt
# %matplot plt
# plt.show()
# from IPython import get_ipython
# get_ipython().run_line_magic('matplotlib', 'inline')
# get_ipython().magic(u'matplotlib inline')
import seaborn as sns

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySpark").getOrCreate()

# spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import ClusteringEvaluator

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
def printdf(df, l=5):
    return df.limit(l).toPandas()

def nullcount(df):
    return {col: df.filter(df[col].isNull()).count() for col in df.columns}

def shape(df):
    print((df.count(), len(df.columns)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Load data
**dataset from RFM with PySpark**

In [5]:
rfm_numbers = spark.read.csv(f"s3://athenapyspark-rfm-analysis-result/analysis-results/evo_rfm_numbers.csv", 
                             inferSchema=True, 
                             header=True)
printdf(rfm_numbers)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   user_id  Recency  Frequency  Monetary
0        5      181         17   11695.0
1       12       72         14    3524.0
2       24      170          8   46300.0
3       31      192          6    2264.0
4       34      234         16    2480.0

<!-- # Prepare data -->

# Inspect

In [6]:
df_rfm_numbers = rfm_numbers.toPandas()
df_rfm_numbers.describe()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

          user_id     Recency  Frequency      Monetary
count   82.000000   82.000000  82.000000     82.000000
mean   316.890244  202.609756   5.134146   5455.427744
std    171.362730  106.623111   5.958213  11059.407664
min      5.000000   60.000000   1.000000      0.000000
25%    178.250000  100.000000   1.000000    530.250000
50%    320.500000  201.500000   3.000000   1465.000000
75%    453.000000  279.000000   5.000000   4988.212500
max    597.000000  459.000000  33.000000  75000.000000

In [7]:
fig, ax = plt.subplots(1, 3, figsize=(16, 8))

# Recency distribution plot
sns.histplot(df_rfm_numbers['Recency'], kde=True, ax=ax[0])

# Frequency distribution plot
sns.histplot(df_rfm_numbers.query('Frequency < 1000')['Frequency'], kde=True, ax=ax[1])

# Monetary distribution plot
sns.histplot(df_rfm_numbers.query('Monetary < 10000')['Monetary'], kde=True, ax=ax[2])

import boto3
s3client=boto3.client('s3')
fig.savefig('kmeans_distribution.png')
s3client.upload_file('kmeans_distribution.png', 'evo-kmeans-analysis', 'analysis-results/kmeans_distribution.png')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# K-Means Clustering

In [8]:
# Since log transformation doesn't work with negative and zero values, lets get rid of negative and zeros

rfm_data = (
    rfm_numbers.withColumn(
        "Monetary", 
        F.when(F.col("Monetary") <= 0, 1)
         .otherwise(F.col("Monetary")))
)

# rfm_data = rfm_numbers
# printdf(rfm_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Elbow Analysis

In [9]:
features = rfm_data.columns[1:]

printdf(rfm_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   user_id  Recency  Frequency  Monetary
0        5      181         17   11695.0
1       12       72         14    3524.0
2       24      170          8   46300.0
3       31      192          6    2264.0
4       34      234         16    2480.0

In [10]:
assembler = VectorAssembler(
    inputCols=features, 
    outputCol="rfm_features")

assembled_data = assembler.transform(rfm_data)

assembled_data = assembled_data.select('user_id', 'rfm_features')

printdf(assembled_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   user_id            rfm_features
0        5  [181.0, 17.0, 11695.0]
1       12    [72.0, 14.0, 3524.0]
2       24   [170.0, 8.0, 46300.0]
3       31    [192.0, 6.0, 2264.0]
4       34   [234.0, 16.0, 2480.0]

In [11]:
scaler = StandardScaler(inputCol='rfm_features', outputCol='rfm_standardized')

data_scale = scaler.fit(assembled_data)

scaled_data = data_scale.transform(assembled_data)

printdf(scaled_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   user_id  ...                                   rfm_standardized
0        5  ...  [1.697568175024843, 2.8532045176685723, 1.0574...
1       12  ...  [0.6752757381314293, 2.349697838080001, 0.3186...
2       24  ...  [1.5944010483658746, 1.3426844789028576, 4.186...
3       31  ...  [1.8007353016838112, 1.0070133591771433, 0.204...
4       34  ...  [2.194646148927145, 2.685368957805715, 0.22424...

[5 rows x 3 columns]

In [12]:
# working with `scaled_data`
# from pyspark.ml.evaluation import ClusteringEvaluator
costs = {}

for k in range(2, 10):
    k_means = KMeans(featuresCol='rfm_standardized', k=k)
    model = k_means.fit(scaled_data)
    costs[k] = model.summary.trainingCost

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
fig, ax = plt.subplots(1, 1, figsize =(16, 8))

ax.plot(costs.keys(), costs.values(), '-ob')
ax.set_xlabel('k')
ax.set_ylabel('cost')

# ax.plot(3, costs[3], 'bo')

import boto3
s3client=boto3.client('s3')
fig.savefig('kmeans_cluster_number.png')
s3client.upload_file('kmeans_cluster_number.png', 'evo-kmeans-analysis', 'analysis-results/kmeans_cluster_number.png')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# K = 3

In [14]:
k_means = KMeans(featuresCol='rfm_standardized', k=3)
model = k_means.fit(scaled_data)
predictions = model.transform(scaled_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
result = predictions.select('user_id', 'prediction')
printdf(result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   user_id  prediction
0        5           2
1       12           2
2       24           1
3       31           0
4       34           2

In [16]:
# Join other information with the prediction result-set

rfm_score = spark.read.csv(f's3://athenapyspark-rfm-analysis-result/analysis-results/evo_loyalty_rfm.csv', 
                           inferSchema=True, 
                           header=True)
rfm_score = rfm_score.select("user_id", "Recency", "Frequency", "Monetary", "RFM_Score", "RFM_ScoreGroup", "Loyalty")
# printdf(rfm_score)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
combined_result = result.join(rfm_score, on='user_id', how='inner')
printdf(combined_result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   user_id  prediction  Recency  ...  RFM_Score  RFM_ScoreGroup   Loyalty
0      242           2       74  ...          3             111  Platinum
1      280           2      116  ...          3             111  Platinum
2      324           0      131  ...          3             111  Platinum
3      351           2       63  ...          3             111  Platinum
4      378           2       67  ...          3             111  Platinum

[5 rows x 8 columns]

In [18]:
combined_result_df = combined_result.toPandas()
import boto3
s3 = boto3.resource('s3')
bucket = s3.Bucket('evo-kmeans-analysis')
import tempfile
with tempfile.TemporaryFile(mode='w+t') as f:
    # write the DataFrame to the temporary file
    combined_result_df.to_csv(f, index=False)
    # reset the file pointer to the beginning of the file
    f.seek(0)
    # upload the file to S3
    bucket.put_object(Key='analysis-results/evo-combined_result_k4.csv', Body=f.read().encode())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0
s3.Object(bucket_name='evo-kmeans-analysis', key='analysis-results/evo-combined_result_k4.csv')

In [19]:
# inspect our loyalty segmentation

# combined_result_df = combined_result_df.sort_values("RFM_Score")

fig, ax = plt.subplots(1, 1, figsize=(6, 8))

sns.countplot(x="prediction", data=combined_result_df)
for p in ax.patches:
    ax.annotate('{}'.format(p.get_height()), (p.get_x() + 0.3, p.get_height() + 20))
    
import boto3
s3client=boto3.client('s3')
fig.savefig('kmeans_prediction.png')
s3client.upload_file('kmeans_prediction.png', 'evo-kmeans-analysis', 'analysis-results/kmeans_prediction.png')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Observation

In [20]:
analysis_df = combined_result.toPandas()

fig, ax = plt.subplots(1, 3, figsize=(20, 12))
sns.boxplot(x='prediction', y='Recency', data=analysis_df, ax=ax[0])
sns.boxplot(x='prediction', y='Frequency', data=analysis_df, ax=ax[1])
sns.boxplot(x='prediction', y='Monetary', data=analysis_df, ax=ax[2])

import boto3
s3client=boto3.client('s3')
fig.savefig('kmeans_observation.png')
s3client.upload_file('kmeans_observation.png', 'evo-kmeans-analysis', 'analysis-results/kmeans_observation.png')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Visualization

In [21]:
selected_result_df = combined_result_df#.query('Monetary < 100000').query('Frequency < 3000')
import boto3
s3client=boto3.client('s3')

# Monetary vs Recency (combined)
fig, ax = plt.subplots(1, 1, figsize=(8, 8))
sns.scatterplot(x='Recency', y='Monetary', data=selected_result_df, hue='prediction', palette="deep")
fig.savefig('kmeans_Monetary_vs_Recency.png')
s3client.upload_file('kmeans_Monetary_vs_Recency.png', 'evo-kmeans-analysis', 'analysis-results/kmeans_Monetary_vs_Recency.png')

# Recency vs Frequency (combined)
fig, ax = plt.subplots(1, 1, figsize=(8, 8))
sns.scatterplot(x='Recency', y='Frequency', data=selected_result_df, hue='prediction', palette="deep")
fig.savefig('kmeans_Recency_vs_Frequency.png')
s3client.upload_file('kmeans_Recency_vs_Frequency.png', 'evo-kmeans-analysis', 'analysis-results/kmeans_Recency_vs_Frequency.png')

# Monetary vs Frequency (combined)
fig, ax = plt.subplots(1, 1, figsize=(8, 8))
sns.scatterplot(x='Monetary', y='Frequency', data=selected_result_df, hue='prediction', palette="deep")
fig.savefig('kmeans_Monetary_vs_Frequency.png')
s3client.upload_file('kmeans_Monetary_vs_Frequency.png', 'evo-kmeans-analysis', 'analysis-results/kmeans_Monetary_vs_Frequency.png')



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
analysis = combined_result\
    .groupBy('prediction')\
    .agg(F.avg('Recency').alias('Avg Recency'),
         F.avg('Frequency').alias('Avg Frequency'),
         F.avg('Monetary').alias('Avg Monetary'))
printdf(analysis)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   prediction  Avg Recency  Avg Frequency  Avg Monetary
0           1   247.000000       4.666667  53233.333333
1           2    92.687500      15.187500  11686.463437
2           0   228.412698       2.603175   1597.804127