# Import necessary modules and create a spark session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,sum,count,to_timestamp
from pyspark.sql import functions as F
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import datetime as dt
from sklearn.preprocessing import MinMaxScaler
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
spark = SparkSession.builder.getOrCreate()

# Reading the Dataset from csv

In [2]:
file_path = 'OnlineRetail.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.show(5)
df.printSchema()

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01-12-2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01-12-2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
only showing top 5 rows

root
 |-- InvoiceNo: string (n


# Data Cleaning


### number of rows and columns

In [3]:
num_rows = df.count()
num_cols = len(df.columns)
print(f"Shape: ({num_rows}, {num_cols})")

Shape: (541909, 8)


### Check for null values in each column and sum them

In [4]:
null_counts = {c: df.where(col(c).isNull()).count() for c in df.columns}
print("Null counts per column:")
for col, count in null_counts.items():
    print(f"{col}: {count}")

Null counts per column:
InvoiceNo: 0
StockCode: 0
Description: 1454
Quantity: 0
InvoiceDate: 0
UnitPrice: 0
CustomerID: 135080
Country: 0


### Dropping rows with any null values and negative values

In [5]:
df = df.na.drop()
df = df.filter((df['Quantity'] >= 0) & (df['UnitPrice'] >= 0))

### Show the updated DataFrame schema and shape after dropping null values

In [6]:
df.printSchema()
num_rows_no_null = df.count()
num_cols_no_null = len(df.columns)
print(f"Shape after dropping null values: ({num_rows_no_null}, {num_cols_no_null})")

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

Shape after dropping null values: (397924, 8)


# Data preprocessing 

### Total amount of purchase done by each individual customer

In [7]:
from pyspark.sql.types import StringType
df = df.withColumn('CustomerID', df['CustomerID'].cast(StringType()))
df = df.withColumn('Amount', df['Quantity'] * df['UnitPrice'])
rfm_ds_m = df.groupBy('CustomerID').agg(sum('Amount').alias('Amount'))
rfm_ds_m = rfm_ds_m.orderBy('CustomerID')
rfm_ds_m.show(10)

+----------+------------------+
|CustomerID|            Amount|
+----------+------------------+
|     12346|           77183.6|
|     12347|            4310.0|
|     12348|           1797.24|
|     12349|           1757.55|
|     12350|334.40000000000003|
|     12352|2506.0400000000004|
|     12353|              89.0|
|     12354|            1079.4|
|     12355|             459.4|
|     12356|2811.4300000000003|
+----------+------------------+
only showing top 10 rows



### Calculate Frequency (number of transactions) per CustomerID

In [8]:
df = df.withColumn('InvoiceNo', df['InvoiceNo'].cast('int'))
rfm_ds_f = df.groupBy('CustomerID').agg(F.count('InvoiceNo').alias('Frequency')).orderBy('CustomerID')
rfm_ds_f.show(10)

+----------+---------+
|CustomerID|Frequency|
+----------+---------+
|     12346|        1|
|     12347|      182|
|     12348|       31|
|     12349|       73|
|     12350|       17|
|     12352|       85|
|     12353|        4|
|     12354|       58|
|     12355|       13|
|     12356|       59|
+----------+---------+
only showing top 10 rows



## Finding the Recency of the customer

In [9]:
from pyspark.sql.functions import col, to_timestamp, max as spark_max, datediff, lit,unix_timestamp,expr

ds = df.withColumn("InvoiceDate", to_timestamp(df["InvoiceDate"], "dd-MM-yyyy HH:mm"))

df = df.withColumn("InvoiceDate", to_timestamp(df["InvoiceDate"], "dd-MM-yyyy HH:mm"))

# Find the maximum date
max_date = df.agg({"InvoiceDate": "max"}).collect()[0][0]

# Calculate the difference and group by CustomerID
rfm_ds_p = df.withColumn("Diff", expr(f"datediff('{max_date}', InvoiceDate)")) \
    .groupBy("CustomerID").agg({"Diff": "min"}) \
    .withColumnRenamed("min(Diff)", "Diff").orderBy("CustomerID") 

# Convert 'Diff' column to IntegerType
rfm_ds_p = rfm_ds_p.withColumn("Diff", col("Diff").cast('int'))

rfm_ds_p.show()

+----------+----+
|CustomerID|Diff|
+----------+----+
|     12346| 325|
|     12347|   2|
|     12348|  75|
|     12349|  18|
|     12350| 310|
|     12352|  36|
|     12353| 204|
|     12354| 232|
|     12355| 214|
|     12356|  22|
|     12357|  33|
|     12358|   1|
|     12359|  57|
|     12360|  52|
|     12361| 287|
|     12362|   3|
|     12363| 109|
|     12364|   7|
|     12365| 291|
|     12367|   4|
+----------+----+
only showing top 20 rows



## Grouping the metrics

In [10]:
rfm_ds_final = rfm_ds_m.join(rfm_ds_f, 'CustomerID', 'inner') \
                      .join(rfm_ds_p, 'CustomerID', 'inner') \
                      .select('CustomerID', 'Amount', 'Frequency', rfm_ds_p['Diff'].alias('Recency')).orderBy("CustomerID")

rfm_ds_final.show()
rfm_ds_final.count()

+----------+------------------+---------+-------+
|CustomerID|            Amount|Frequency|Recency|
+----------+------------------+---------+-------+
|     12346|           77183.6|        1|    325|
|     12347|            4310.0|      182|      2|
|     12348|           1797.24|       31|     75|
|     12349|           1757.55|       73|     18|
|     12350|334.40000000000003|       17|    310|
|     12352|2506.0400000000004|       85|     36|
|     12353|              89.0|        4|    204|
|     12354|            1079.4|       58|    232|
|     12355|             459.4|       13|    214|
|     12356|2811.4300000000003|       59|     22|
|     12357| 6207.669999999996|      131|     33|
|     12358|           1168.06|       19|      1|
|     12359|           6372.58|      248|     57|
|     12360|           2662.06|      129|     52|
|     12361|189.89999999999998|       10|    287|
|     12362|5226.2300000000005|      266|      3|
|     12363|             552.0|       23|    109|


4339

In [11]:
# Assuming 'rfm_ds_scaled' is your PySpark DataFrame
# Convert PySpark DataFrame to Pandas DataFrame
rfm_ds_final = rfm_ds_final.select("*").toPandas()


## Eliminating the Outliers

In [12]:
Q1 = rfm_ds_final.Amount.quantile(0.05)
Q3 = rfm_ds_final.Amount.quantile(0.95)
IQR = Q3 - Q1
rfm_ds_final = rfm_ds_final[(rfm_ds_final.Amount >= Q1 - 1.5*IQR) & (rfm_ds_final.Amount <= Q3 + 1.5*IQR)]

Q1 = rfm_ds_final.Recency.quantile(0.05)
Q3 = rfm_ds_final.Recency.quantile(0.95)
IQR = Q3 - Q1
rfm_ds_final = rfm_ds_final[(rfm_ds_final.Recency >= Q1 - 1.5*IQR) & (rfm_ds_final.Recency <= Q3 + 1.5*IQR)]

Q1 = rfm_ds_final.Frequency.quantile(0.05)
Q3 = rfm_ds_final.Frequency.quantile(0.95)
IQR = Q3 - Q1
rfm_ds_final = rfm_ds_final[(rfm_ds_final.Frequency >= Q1 - 1.5*IQR) & (rfm_ds_final.Frequency <= Q3 + 1.5*IQR)]

In [27]:
print(rfm_ds_final.shape)
rfm_ds_final.head(20)

(4257, 5)


Unnamed: 0,CustomerID,Amount,Frequency,Recency,Cluster_Id
1,12347,4310.0,182,2,0
2,12348,1797.24,31,75,1
3,12349,1757.55,73,18,1
4,12350,334.4,17,310,2
5,12352,2506.04,85,36,1
6,12353,89.0,4,204,2
7,12354,1079.4,58,232,2
8,12355,459.4,13,214,2
9,12356,2811.43,59,22,1
10,12357,6207.67,131,33,0


## Scaling the metrics

In [23]:
#scaling
X = rfm_ds_final[['Amount', 'Frequency', 'Recency']]
scaler = MinMaxScaler()
rfm_ds_scaled = scaler.fit_transform(X)


In [25]:
rfm_ds_scaled = pd.DataFrame(rfm_ds_scaled)
rfm_ds_scaled.columns = ['Amount', 'Frequency', 'Recency']
rfm_ds_scaled['Recency'] = 1-rfm_ds_scaled['Recency']
rfm_ds_scaled.head()

Unnamed: 0,Amount,Frequency,Recency
0,0.299516,0.255289,0.994638
1,0.124896,0.042313,0.798928
2,0.122138,0.101551,0.951743
3,0.023239,0.022567,0.168901
4,0.174153,0.118477,0.903485


## Using the K-Means clustering

In [28]:
#model creation
kmeans = KMeans(n_clusters=3, max_iter=50)
kmeans.fit(rfm_ds_scaled)
lbs = kmeans.labels_
print(kmeans.labels_)

[2 0 0 ... 1 0 0]


  super()._check_params_vs_input(X, default_n_init=10)


In [29]:
rfm_ds_final['Cluster_Id'] = lbs
rfm_ds_final.head()
rfm_ds_scaled['Cluster_Id'] = lbs
rfm_ds_final.head()

Unnamed: 0,CustomerID,Amount,Frequency,Recency,Cluster_Id
1,12347,4310.0,182,2,2
2,12348,1797.24,31,75,0
3,12349,1757.55,73,18,0
4,12350,334.4,17,310,1
5,12352,2506.04,85,36,0


## Writing Data to csv files

In [30]:
rfm_ds_final.to_csv('RetailAnalysis.csv', index=False)

In [31]:
rfm_ds_scaled['Cluster_Id'] = lbs
rfm_ds_scaled['CustomerID'] = rfm_ds_final['CustomerID']
rfm_ds_scaled.head()
rfm_ds_scaled.to_csv('RetailAnalysisScaled.csv', index=False)