### Import libraries

In [1]:
from pyspark import SparkContext

In [2]:
from pyspark.sql import SparkSession

In [3]:
from pyspark.sql.functions import *

In [4]:
from pyspark.sql.types import IntegerType, FloatType, DateType

In [5]:
from pyspark.sql.window import Window

### Initialized SparkSession

In [6]:
# run spark using all of cores
sc = SparkContext(master="local[*]")

In [7]:
# start SparkSession
spark = SparkSession(sparkContext=sc).builder.appName("Market Basket").getOrCreate()

### Load csv data

Market Basket Data from Kaggle: https://www.kaggle.com/datasets/aslanahmedov/market-basket-analysis

In [8]:
# source file is separated by semicolon
df = spark.read.option("sep", ";").csv("data.csv", header=True)

In [9]:
df.show(10)

+------+--------------------+--------+----------------+-----+----------+--------------+
|BillNo|            Itemname|Quantity|            Date|Price|CustomerID|       Country|
+------+--------------------+--------+----------------+-----+----------+--------------+
|536365|WHITE HANGING HEA...|       6|01.12.2010 08:26| 2,55|     17850|United Kingdom|
|536365| WHITE METAL LANTERN|       6|01.12.2010 08:26| 3,39|     17850|United Kingdom|
|536365|CREAM CUPID HEART...|       8|01.12.2010 08:26| 2,75|     17850|United Kingdom|
|536365|KNITTED UNION FLA...|       6|01.12.2010 08:26| 3,39|     17850|United Kingdom|
|536365|RED WOOLLY HOTTIE...|       6|01.12.2010 08:26| 3,39|     17850|United Kingdom|
|536365|SET 7 BABUSHKA NE...|       2|01.12.2010 08:26| 7,65|     17850|United Kingdom|
|536365|GLASS STAR FROSTE...|       6|01.12.2010 08:26| 4,25|     17850|United Kingdom|
|536366|HAND WARMER UNION...|       6|01.12.2010 08:28| 1,85|     17850|United Kingdom|
|536366|HAND WARMER RED P...|   

### Columns and count rows

In [10]:
df.columns

['BillNo', 'Itemname', 'Quantity', 'Date', 'Price', 'CustomerID', 'Country']

In [11]:
# number of rows
df.count()

522064

### Dataframe schema

In [12]:
df.dtypes

[('BillNo', 'string'),
 ('Itemname', 'string'),
 ('Quantity', 'string'),
 ('Date', 'string'),
 ('Price', 'string'),
 ('CustomerID', 'string'),
 ('Country', 'string')]

In [13]:
df.printSchema()

root
 |-- BillNo: string (nullable = true)
 |-- Itemname: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



### Inspect Duplicates values

In [14]:
# inspect duplicate values and count them
df.groupby(["BillNo", "Itemname", "Quantity", "Date", "Price", "CustomerID", "Country"]).count() \
.filter("count > 1").sort("count", ascending=False).show(5)

+------+--------------------+--------+----------------+-----+----------+--------------+-----+
|BillNo|            Itemname|Quantity|            Date|Price|CustomerID|       Country|count|
+------+--------------------+--------+----------------+-----+----------+--------------+-----+
|555524|PINK REGENCY TEAC...|       1|05.06.2011 11:37| 2,95|     16923|United Kingdom|   20|
|555524|GREEN REGENCY TEA...|       1|05.06.2011 11:37| 2,95|     16923|United Kingdom|   12|
|572861|PURPLE DRAWERKNOB...|      12|26.10.2011 12:46| 1,25|     14102|United Kingdom|    8|
|538514|BATH BUILDING BLO...|       1|12.12.2010 14:27| 5,95|     15044|United Kingdom|    6|
|541266|HOME BUILDING BLO...|       1|16.01.2011 16:25| 5,95|     15673|United Kingdom|    6|
+------+--------------------+--------+----------------+-----+----------+--------------+-----+
only showing top 5 rows



In [15]:
# example of duplicate values
df.filter((df.BillNo == 555524) & (df.Itemname == "PINK REGENCY TEACUP AND SAUCER")).sort("Itemname").show()

+------+--------------------+--------+----------------+-----+----------+--------------+
|BillNo|            Itemname|Quantity|            Date|Price|CustomerID|       Country|
+------+--------------------+--------+----------------+-----+----------+--------------+
|555524|PINK REGENCY TEAC...|       1|05.06.2011 11:37| 2,95|     16923|United Kingdom|
|555524|PINK REGENCY TEAC...|       1|05.06.2011 11:37| 2,95|     16923|United Kingdom|
|555524|PINK REGENCY TEAC...|       1|05.06.2011 11:37| 2,95|     16923|United Kingdom|
|555524|PINK REGENCY TEAC...|       1|05.06.2011 11:37| 2,95|     16923|United Kingdom|
|555524|PINK REGENCY TEAC...|       1|05.06.2011 11:37| 2,95|     16923|United Kingdom|
|555524|PINK REGENCY TEAC...|       1|05.06.2011 11:37| 2,95|     16923|United Kingdom|
|555524|PINK REGENCY TEAC...|       1|05.06.2011 11:37| 2,95|     16923|United Kingdom|
|555524|PINK REGENCY TEAC...|       1|05.06.2011 11:37| 2,95|     16923|United Kingdom|
|555524|PINK REGENCY TEAC...|   

In [16]:
# drop duplicate values
df = df.dropDuplicates()

In [17]:
# show duplicate values again
df.groupby(["BillNo", "Itemname", "Quantity", "Date", "Price", "CustomerID", "Country"]).count() \
.filter("count > 1").sort("count", ascending=False).show(5)

+------+--------+--------+----+-----+----------+-------+-----+
|BillNo|Itemname|Quantity|Date|Price|CustomerID|Country|count|
+------+--------+--------+----+-----+----------+-------+-----+
+------+--------+--------+----+-----+----------+-------+-----+



### Inspect Null values

In [18]:
df.summary("count").show()

+-------+------+--------+--------+------+------+----------+-------+
|summary|BillNo|Itemname|Quantity|  Date| Price|CustomerID|Country|
+-------+------+--------+--------+------+------+----------+-------+
|  count|516778|  515323|  516778|516778|516778|    382811| 516778|
+-------+------+--------+--------+------+------+----------+-------+



Itemname and CustomerID columns contain null values

In [19]:
# inspect this 2 columns
df.filter(df.Itemname.isNull()).show(5)
df.filter(df.CustomerID.isNull()).show(5)

+------+--------+--------+----------------+-----+----------+--------------+
|BillNo|Itemname|Quantity|            Date|Price|CustomerID|       Country|
+------+--------+--------+----------------+-----+----------+--------------+
|543238|    null|       1|04.02.2011 13:42|    0|      null|United Kingdom|
|537878|    null|       1|08.12.2010 18:09|    0|      null|United Kingdom|
|543258|    null|    1287|04.02.2011 16:06|    0|      null|United Kingdom|
|542504|    null|    5568|28.01.2011 12:03|    0|      null|United Kingdom|
|543268|    null|       3|04.02.2011 17:24|    0|      null|United Kingdom|
+------+--------+--------+----------------+-----+----------+--------------+
only showing top 5 rows

+------+--------------------+--------+----------------+-----+----------+--------------+
|BillNo|            Itemname|Quantity|            Date|Price|CustomerID|       Country|
+------+--------------------+--------+----------------+-----+----------+--------------+
|536544|RAIN PONCHO RETRO..

In this project, I need to make RFM customer segmentation. Thus, I will drop null values.

In [20]:
# drop null values
df = df.dropna()

In [21]:
# show it again
df.filter(df.Itemname.isNull()).show(1)
df.filter(df.CustomerID.isNull()).show(1)

+------+--------+--------+----+-----+----------+-------+
|BillNo|Itemname|Quantity|Date|Price|CustomerID|Country|
+------+--------+--------+----+-----+----------+-------+
+------+--------+--------+----+-----+----------+-------+

+------+--------+--------+----+-----+----------+-------+
|BillNo|Itemname|Quantity|Date|Price|CustomerID|Country|
+------+--------+--------+----+-----+----------+-------+
+------+--------+--------+----+-----+----------+-------+



In [22]:
# count rows
df.summary("count").show()

+-------+------+--------+--------+------+------+----------+-------+
|summary|BillNo|Itemname|Quantity|  Date| Price|CustomerID|Country|
+-------+------+--------+--------+------+------+----------+-------+
|  count|382811|  382811|  382811|382811|382811|    382811| 382811|
+-------+------+--------+--------+------+------+----------+-------+



### Change column data types

In [23]:
df.show(5)

+------+--------------------+--------+----------------+-----+----------+--------------+
|BillNo|            Itemname|Quantity|            Date|Price|CustomerID|       Country|
+------+--------------------+--------+----------------+-----+----------+--------------+
|536404|PARTY INVITES SPA...|      12|01.12.2010 11:29| 0,85|     16218|United Kingdom|
|536409|VANILLA SCENT CAN...|       1|01.12.2010 11:45| 4,25|     17908|United Kingdom|
|536522|PINK & WHITE BREA...|       1|01.12.2010 12:49| 5,95|     15012|United Kingdom|
|536525|REGENCY CAKESTAND...|       2|01.12.2010 12:54|12,75|     14078|United Kingdom|
|536532|PACK OF 12 WOODLA...|      48|01.12.2010 13:24| 0,29|     12433|        Norway|
+------+--------------------+--------+----------------+-----+----------+--------------+
only showing top 5 rows



In [24]:
# replace , by . and convert to float
df = df.withColumn("Price", regexp_replace("Price", ",", ".")).withColumn("Price", col("Price").cast(FloatType()))

In [25]:
# convert quantity column to int and date column to timestamp
df = df.withColumn("Quantity", col("Quantity").cast(IntegerType())).withColumn("Date", to_timestamp(col("Date"), "dd.MM.yyy HH:mm"))

In [26]:
df.printSchema()

root
 |-- BillNo: string (nullable = true)
 |-- Itemname: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Price: float (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [27]:
df.show(5)

+------+--------------------+--------+-------------------+-----+----------+--------------+
|BillNo|            Itemname|Quantity|               Date|Price|CustomerID|       Country|
+------+--------------------+--------+-------------------+-----+----------+--------------+
|536404|PARTY INVITES SPA...|      12|2010-12-01 11:29:00| 0.85|     16218|United Kingdom|
|536409|VANILLA SCENT CAN...|       1|2010-12-01 11:45:00| 4.25|     17908|United Kingdom|
|536522|PINK & WHITE BREA...|       1|2010-12-01 12:49:00| 5.95|     15012|United Kingdom|
|536525|REGENCY CAKESTAND...|       2|2010-12-01 12:54:00|12.75|     14078|United Kingdom|
|536532|PACK OF 12 WOODLA...|      48|2010-12-01 13:24:00| 0.29|     12433|        Norway|
+------+--------------------+--------+-------------------+-----+----------+--------------+
only showing top 5 rows



### Exploratory Data Analysis (EDA)

show summary statistics

In [28]:
df.describe().show()

+-------+------------------+--------------------+------------------+------------------+------------------+-----------+
|summary|            BillNo|            Itemname|          Quantity|             Price|        CustomerID|    Country|
+-------+------------------+--------------------+------------------+------------------+------------------+-----------+
|  count|            382811|              382811|            382811|            382811|            382811|     382811|
|   mean| 560585.1165222524|                null|13.026059857214134|3.0886358188871323|15310.347701607321|       null|
| stddev|13108.369584467999|                null|183.82358537616253|22.132048668029725|1722.4835156192983|       null|
|    min|            536365|"ASSORTED FLOWER ...|                 1|               0.0|             12346|  Australia|
|    max|            581587|ZINC WIRE SWEETHE...|             80995|           8142.75|             18287|Unspecified|
+-------+------------------+--------------------

In [29]:
df.summary().show()

+-------+------------------+--------------------+------------------+------------------+------------------+-----------+
|summary|            BillNo|            Itemname|          Quantity|             Price|        CustomerID|    Country|
+-------+------------------+--------------------+------------------+------------------+------------------+-----------+
|  count|            382811|              382811|            382811|            382811|            382811|     382811|
|   mean| 560585.1165222524|                null|13.026059857214134|3.0886358188871323|15310.347701607321|       null|
| stddev|13108.369584467999|                null|183.82358537616253|22.132048668029725|1722.4835156192983|       null|
|    min|            536365|"ASSORTED FLOWER ...|                 1|               0.0|             12346|  Australia|
|    25%|          549222.0|                null|                 2|              1.25|           13928.0|       null|
|    50%|          561873.0|                null

Quantity and Price columns may contain outliers. I will inspect later.

In [30]:
df.select(min("Date"), max("Date")).show()

+-------------------+-------------------+
|          min(Date)|          max(Date)|
+-------------------+-------------------+
|2010-12-01 08:26:00|2011-12-09 12:50:00|
+-------------------+-------------------+



Calculate TotalPrice by Price * Quantity

In [31]:
df = df.withColumn("TotalPrice", round(col("Price")*col("Quantity"), 2))

Inspect Quantity and Price columns

In [32]:
df.sort("Quantity", ascending=False).show(10)

+------+--------------------+--------+-------------------+-----+----------+--------------+----------+
|BillNo|            Itemname|Quantity|               Date|Price|CustomerID|       Country|TotalPrice|
+------+--------------------+--------+-------------------+-----+----------+--------------+----------+
|581483|PAPER CRAFT , LIT...|   80995|2011-12-09 09:15:00| 2.08|     16446|United Kingdom|  168469.6|
|541431|MEDIUM CERAMIC TO...|   74215|2011-01-18 10:01:00| 1.04|     12346|United Kingdom|  77183.59|
|578841|ASSTD DESIGN 3D P...|   12540|2011-11-25 15:57:00|  0.0|     13256|United Kingdom|       0.0|
|573008|WORLD WAR 2 GLIDE...|    4800|2011-10-27 12:26:00| 0.21|     12901|United Kingdom|    1008.0|
|554868|SMALL POPCORN HOLDER|    4300|2011-05-27 10:52:00| 0.72|     13135|United Kingdom|    3096.0|
|544612|EMPIRE DESIGN ROS...|    3906|2011-02-22 10:43:00| 0.82|     18087|United Kingdom|   3202.92|
|560599|ESSENTIAL BALM 3....|    3186|2011-07-19 17:04:00| 0.06|     14609|United 

In [33]:
df.sort("Price", ascending=False).show()

+------+--------------+--------+-------------------+-------+----------+--------------+----------+
|BillNo|      Itemname|Quantity|               Date|  Price|CustomerID|       Country|TotalPrice|
+------+--------------+--------+-------------------+-------+----------+--------------+----------+
|551697|       POSTAGE|       1|2011-05-03 13:46:00|8142.75|     16029|United Kingdom|   8142.75|
|573077|        Manual|       1|2011-10-27 14:13:00|4161.06|     12536|        France|   4161.06|
|573080|        Manual|       1|2011-10-27 14:20:00|4161.06|     12536|        France|   4161.06|
|571751|        Manual|       1|2011-10-19 11:18:00|3949.32|     12744|     Singapore|   3949.32|
|569382|        Manual|       1|2011-10-03 16:44:00|3155.95|     15502|United Kingdom|   3155.95|
|562946|        Manual|       1|2011-08-11 09:38:00| 2500.0|     15581|United Kingdom|    2500.0|
|548813|        Manual|       1|2011-04-04 13:03:00|2382.92|     12744|     Singapore|   2382.92|
|571751|        Manu

In my opinion, It is probably outlier but in this project, I will leave it

### Prepare data for dashboard

Country dataframe with total bills and total sales

In [34]:
country_df = df.select("BillNo", "TotalPrice", "Country").groupBy("Country") \
.agg(count("BillNo").alias("TotalBill"), round(sum("TotalPrice"), 2).alias("TotalSpend"))

Itemnames dataframe with total bills, total quantity, and total sales

In [35]:
item_df = df.select("BillNo", "Quantity", "TotalPrice", "Itemname").groupBy("Itemname") \
.agg(count("BillNo").alias("TotalBill"), sum("Quantity").alias("TotalQuantity"), round(sum("TotalPrice"), 2).alias("TotalSpend"))

### Prepare data for RFM

Temporary dataframe to create RFM segmentation

In [36]:
agg_df = df.select("BillNo", "CustomerID", "Date", "TotalPrice").groupBy("BillNo", "CustomerID", "Date") \
.agg(round(sum("TotalPrice"), 2).alias("TotalPrice"))

In [37]:
agg_df.show(5)

+------+----------+-------------------+----------+
|BillNo|CustomerID|               Date|TotalPrice|
+------+----------+-------------------+----------+
|538509|     14413|2010-12-12 13:37:00|    121.05|
|544450|     17811|2011-02-20 12:23:00|   1242.95|
|539118|     15563|2010-12-16 11:27:00|    270.19|
|541987|     14130|2011-01-24 16:05:00|    480.91|
|539095|     16037|2010-12-16 10:18:00|    300.46|
+------+----------+-------------------+----------+
only showing top 5 rows



### Create RFM segmentation dataframe

In [38]:
# define 1 jan 2012 as latest date
latest_date = to_date(lit("2012-01-01"), "yyyy-MM-dd")

In [39]:
# create rfm dataframe
# recency = latest date - date
# frequency = count number of  bills
# monetary = total spend with us
rfm_df = agg_df.groupBy("CustomerID").agg(datediff(latest_date, max(col("Date"))).alias("Recency"), \
                                          count("BillNo").alias("Frequency"), \
                                          round(sum("TotalPrice"), 2).alias("Monetary"))

In [40]:
rfm_df.describe().show()

+-------+------------------+------------------+-----------------+------------------+
|summary|        CustomerID|           Recency|        Frequency|          Monetary|
+-------+------------------+------------------+-----------------+------------------+
|  count|              4297|              4297|             4297|              4297|
|   mean|15315.691179892949|115.13730509657901|4.233651384686991|1987.5408377938106|
| stddev| 1716.241593690511|100.14658133817039|7.108585407895454| 8584.801281937924|
|    min|             12346|                23|                1|               0.0|
|    max|             18287|               396|              211|         280206.02|
+-------+------------------+------------------+-----------------+------------------+



In [41]:
# calculate percentile of RFM
rfm_df = rfm_df.withColumn("R_pct", round(percent_rank().over(Window.partitionBy().orderBy("Recency")), 2)) \
.withColumn("F_pct", round(percent_rank().over(Window.partitionBy().orderBy("Frequency")), 2)) \
.withColumn("M_pct", round(percent_rank().over(Window.partitionBy().orderBy("Monetary")), 2))

In [42]:
# score 1 to 5 by percentile
# recency: low to high
# frequency and monetary: high to low
rfm_df = rfm_df.withColumn("R_score", \
                           when(col("R_pct") <= 0.2, 5) \
                           .when((col("R_pct") > 0.2)&(col("R_pct") <= 0.4), 4) \
                           .when((col("R_pct") > 0.4)&(col("R_pct") <= 0.6), 3) \
                           .when((col("R_pct") > 0.6)&(col("R_pct") <= 0.8), 2) \
                           .when(col("R_pct") > 0.8, 1) \
                           .otherwise(0)) \
                .withColumn("F_score", \
                           when(col("F_pct") <= 0.2, 1) \
                           .when((col("F_pct") > 0.2)&(col("F_pct") <= 0.4), 2) \
                           .when((col("F_pct") > 0.4)&(col("F_pct") <= 0.6), 3) \
                           .when((col("F_pct") > 0.6)&(col("F_pct") <= 0.8), 4) \
                           .when(col("F_pct") > 0.8, 5) \
                           .otherwise(0)) \
                .withColumn("M_score", \
                           when(col("M_pct") <= 0.2, 1) \
                           .when((col("M_pct") > 0.2)&(col("M_pct") <= 0.4), 2) \
                           .when((col("M_pct") > 0.4)&(col("M_pct") <= 0.6), 3) \
                           .when((col("M_pct") > 0.6)&(col("M_pct") <= 0.8), 4) \
                           .when(col("M_pct") > 0.8, 5) \
                           .otherwise(0)) \
                .drop("R_pct", "F_pct", "M_pct")

In [43]:
rfm_df.describe().show()

+-------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|        CustomerID|           Recency|        Frequency|          Monetary|          R_score|           F_score|           M_score|
+-------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              4297|              4297|             4297|              4297|             4297|              4297|              4297|
|   mean|15315.691179892949|115.13730509657901|4.233651384686991|1987.5408377938106|3.035838957412148|2.6283453572259714|2.9799860367698394|
| stddev|1716.2415936905106|100.14658133817024|7.108585407895449| 8584.801281937933|1.419018269541311|1.5042976558838501|1.4142365068771616|
|    min|             12346|                23|                1|               0.0|                1|                 1|                 1|
|    max|    

### Export as parquet file

In [44]:
rfm_df.write.save("rfm.parquet")

In [45]:
country_df.write.save("country.parquet")

In [46]:
item_df.write.save("item.parquet")

### Stop Spark Session

In [47]:
spark.stop()