In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("DataCamp Online Retail").getOrCreate()
spark

In [15]:
df = spark.read.csv("OnlineRetail.csv", header=True, inferSchema=True, escape="\"")
df.show(5, 0)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/2010 8:26|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
+---------+---------+-----------------------------------

 ### Exploratory Data Analysis

In [17]:
### How many rows we have
df.count()

541909

In [18]:
# How many unique customers do we have?
df.select("CustomerID").distinct().count()

4373

In [19]:
### Find which country purchased more?
from pyspark.sql.functions import *
from pyspark.sql.types import *
count_distinct = df.groupBy("Country").agg(countDistinct("CustomerID").alias("country_count"))
count_distinct.show()

+------------------+-------------+
|           Country|country_count|
+------------------+-------------+
|            Sweden|            8|
|         Singapore|            1|
|           Germany|           95|
|               RSA|            1|
|            France|           87|
|            Greece|            4|
|European Community|            1|
|           Belgium|           25|
|           Finland|           12|
|             Malta|            2|
|       Unspecified|            4|
|             Italy|           15|
|              EIRE|            3|
|         Lithuania|            1|
|            Norway|           10|
|             Spain|           31|
|           Denmark|            9|
|         Hong Kong|            0|
|           Iceland|            1|
|            Israel|            4|
+------------------+-------------+
only showing top 20 rows



In [20]:
# Order the country that has a lot of purchases Descendent order
count_distinct.sort("country_count", ascending=[False]).show()

+---------------+-------------+
|        Country|country_count|
+---------------+-------------+
| United Kingdom|         3950|
|        Germany|           95|
|         France|           87|
|          Spain|           31|
|        Belgium|           25|
|    Switzerland|           21|
|       Portugal|           19|
|          Italy|           15|
|        Finland|           12|
|        Austria|           11|
|         Norway|           10|
|        Denmark|            9|
|Channel Islands|            9|
|      Australia|            9|
|    Netherlands|            9|
|         Sweden|            8|
|         Cyprus|            8|
|          Japan|            8|
|         Poland|            6|
|         Greece|            4|
+---------------+-------------+
only showing top 20 rows



In [21]:
df.groupBy("Country").agg(countDistinct("CustomerID").alias("country_count")).orderBy(desc('country_count')).show()

+---------------+-------------+
|        Country|country_count|
+---------------+-------------+
| United Kingdom|         3950|
|        Germany|           95|
|         France|           87|
|          Spain|           31|
|        Belgium|           25|
|    Switzerland|           21|
|       Portugal|           19|
|          Italy|           15|
|        Finland|           12|
|        Austria|           11|
|         Norway|           10|
|        Denmark|            9|
|Channel Islands|            9|
|      Australia|            9|
|    Netherlands|            9|
|         Sweden|            8|
|         Cyprus|            8|
|          Japan|            8|
|         Poland|            6|
|         Greece|            4|
+---------------+-------------+
only showing top 20 rows



In [22]:
## Find when the most recent purchased was made on the platform?
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY") # 12/1/2010 8:26
df = df.withColumn('date',to_timestamp("InvoiceDate", 'MM/dd/yyyy HH:mm'))
df.select(max("date")).show()

+-------------------+
|          max(date)|
+-------------------+
|2011-12-09 12:50:00|
+-------------------+



In [23]:
# Find the earliest purchased done by the customer?
df.select(min("date")).show()

+-------------------+
|          min(date)|
+-------------------+
|2010-12-01 08:26:00|
+-------------------+



## Data Pre-Processing

In [24]:
df.show(5,0)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+-------------------+-------------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |from_date          |date               |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+-------------------+-------------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26|2.55     |17850     |United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/2010 8:26|2.75     |17850     |United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BO

In [25]:
# We have two factors needs to be take in consideration:
# 1- Recency: The most recent purchases done.
# ---> Assign a recency score to each customer
# 2- 

In [30]:
df = df.withColumn("from_date", lit("12/1/2010 08:26"))
df = df.withColumn("from_date", to_timestamp("from_date", "MM/dd/yyyy HH:mm"))
df2 = df.withColumn("from_date", to_timestamp(col("from_date"))).withColumn("recency", col("date").cast("long") - col("from_date").cast("long"))

df2.show(5, 0)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+-------------------+-------------------+-------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |from_date          |date               |recency|
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+-------------------+-------------------+-------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26|2.55     |17850     |United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|0      |
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|0      |
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/2010 8:26|2.75     |17850     |United Kingdom|2010-12-01 08:26:00|2010-12-01 08:26:00|0      |
|536

### 1- Select the most recent purchase

In [31]:
df2 = df2.join(df2.groupBy("CustomerID").agg(max("recency").alias("recency")), on="recency", how="leftsemi")
df2.show(5, 3)

+-------+---------+---------+-----------+--------+-----------+---------+----------+-------+---------+----+
|recency|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|from_date|date|
+-------+---------+---------+-----------+--------+-----------+---------+----------+-------+---------+----+
|    522|      536|      824|        WOO|       3|        12/|      6.4|       180|    Uni|      201| 201|
|    522|      536|      847|        COL|      48|        12/|      0.6|       180|    Uni|      201| 201|
|    522|      536|      224|        HAN|      12|        12/|      1.6|       180|    Uni|      201| 201|
|    522|      536|      213|        HAN|       6|        12/|      2.9|       180|    Uni|      201| 201|
|    522|      536|      224|        NAT|      12|        12/|      2.9|       180|    Uni|      201| 201|
+-------+---------+---------+-----------+--------+-----------+---------+----------+-------+---------+----+
only showing top 5 rows



In [32]:
df2.printSchema()

root
 |-- recency: long (nullable = true)
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- from_date: timestamp (nullable = true)
 |-- date: timestamp (nullable = true)



### 2- Calculate the frequency

In [37]:
def_freq = df2.groupBy("CustomerID").agg(count("InvoiceDate").alias("frequency"))
def_freq.show(5,0)

+----------+---------+
|CustomerID|frequency|
+----------+---------+
|15447     |9        |
|15100     |1        |
|13898     |12       |
|17950     |33       |
|16565     |3        |
+----------+---------+
only showing top 5 rows



In [40]:
df3 = df2.join(def_freq, on="CustomerID", how="inner")
df3.show(5,0)

+----------+-------+---------+---------+-----------------------------------+--------+--------------+---------+--------------+-------------------+-------------------+---------+
|CustomerID|recency|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|Country       |from_date          |date               |frequency|
+----------+-------+---------+---------+-----------------------------------+--------+--------------+---------+--------------+-------------------+-------------------+---------+
|18074     |5220   |536384   |82484    |WOOD BLACK BOARD ANT WHITE FINISH  |3       |12/1/2010 9:53|6.45     |United Kingdom|2010-12-01 08:26:00|2010-12-01 09:53:00|13       |
|18074     |5220   |536384   |84755    |COLOUR GLASS T-LIGHT HOLDER HANGING|48      |12/1/2010 9:53|0.65     |United Kingdom|2010-12-01 08:26:00|2010-12-01 09:53:00|13       |
|18074     |5220   |536384   |22464    |HANGING METAL HEART LANTERN        |12      |12/1/2010 9:53|1.65     |United Kin

In [41]:
df3.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- recency: long (nullable = true)
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- Country: string (nullable = true)
 |-- from_date: timestamp (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- frequency: long (nullable = false)



### 3- Monetary Value

In [45]:
## Calculate the total amount spent per customer
## Two Steps:
# 1- found the total amount spent in each purchase


In [44]:
m_val = df3.withColumn("TotalAmount", col("Quantity") * col("UnitPrice"))
m_val.show(5, 0)

+----------+-------+---------+---------+-----------------------------------+--------+--------------+---------+--------------+-------------------+-------------------+---------+------------------+
|CustomerID|recency|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|Country       |from_date          |date               |frequency|TotalAmount       |
+----------+-------+---------+---------+-----------------------------------+--------+--------------+---------+--------------+-------------------+-------------------+---------+------------------+
|18074     |5220   |536384   |82484    |WOOD BLACK BOARD ANT WHITE FINISH  |3       |12/1/2010 9:53|6.45     |United Kingdom|2010-12-01 08:26:00|2010-12-01 09:53:00|13       |19.35             |
|18074     |5220   |536384   |84755    |COLOUR GLASS T-LIGHT HOLDER HANGING|48      |12/1/2010 9:53|0.65     |United Kingdom|2010-12-01 08:26:00|2010-12-01 09:53:00|13       |31.200000000000003|
|18074     |5220   |53638

In [46]:
# 2- find the total amount spent by each customer
m_val = m_val.groupBy("CustomerID").agg(sum("TotalAmount").alias("monetry_value"))
m_val.show(5,0)

+----------+-------------------+
|CustomerID|monetry_value      |
+----------+-------------------+
|15447     |155.17             |
|15100     |-32.849999999999994|
|13898     |155.93             |
|17950     |208.95             |
|16565     |173.7              |
+----------+-------------------+
only showing top 5 rows



In [None]:
# Merge the tow dataframes
final_df = m_val.join(df3, on="CustomerID", how="inner")
final_df.show(5,0)

In [54]:
# Select the necessery columns to build the column

In [None]:
final_df = final_df.select(['recency','frequency','monetry_value','CustomerID']).distinct