In [92]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data Analysis using Pyspark")\
.config("spark.memory.offHeap.enabled","true").config("spark.memory.offHeap.size","20g").getOrCreate()

In [93]:
df =spark.read.csv("Online_Retail.csv",header=True, escape="\"")

In [94]:
df.show(5,0)

+---------+---------+-----------------------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate     |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+----------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |01-12-2010 08:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |01-12-2010 08:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |01-12-2010 08:26|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |01-12-2010 08:26|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |01-12-2010 08:26|3.39     |17850     |United Kingdom|
+---------+---------+-------------------

In [95]:
df.count()

541909

In [96]:
df.select('CustomerID').distinct().count()

4373

In [97]:
df.head()

Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity='6', InvoiceDate='01-12-2010 08:26', UnitPrice='2.55', CustomerID='17850', Country='United Kingdom')

In [98]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

df.groupBy('Country').agg(countDistinct('CustomerID').alias('country_count')).show()

+------------------+-------------+
|           Country|country_count|
+------------------+-------------+
|            Sweden|            8|
|         Singapore|            1|
|           Germany|           95|
|               RSA|            1|
|            France|           87|
|            Greece|            4|
|European Community|            1|
|           Belgium|           25|
|           Finland|           12|
|             Malta|            2|
|       Unspecified|            4|
|             Italy|           15|
|              EIRE|            3|
|         Lithuania|            1|
|            Norway|           10|
|             Spain|           31|
|           Denmark|            9|
|         Hong Kong|            0|
|            Israel|            4|
|           Iceland|            1|
+------------------+-------------+
only showing top 20 rows



In [99]:
 df.groupBy('Country').agg(countDistinct('CustomerID').alias('country_count')).orderBy(desc('country_count')).show()

+---------------+-------------+
|        Country|country_count|
+---------------+-------------+
| United Kingdom|         3950|
|        Germany|           95|
|         France|           87|
|          Spain|           31|
|        Belgium|           25|
|    Switzerland|           21|
|       Portugal|           19|
|          Italy|           15|
|        Finland|           12|
|        Austria|           11|
|         Norway|           10|
|        Denmark|            9|
|Channel Islands|            9|
|      Australia|            9|
|    Netherlands|            9|
|         Sweden|            8|
|         Cyprus|            8|
|          Japan|            8|
|         Poland|            6|
|         Greece|            4|
+---------------+-------------+
only showing top 20 rows



In [100]:
#when was the most recent purchase made by a customer on the E-commerce platform

#set the legacy time parser polidy
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
#convert invoice date to correct format
df = df.withColumn('date',to_timestamp("InvoiceDate",'yy/MM/dd HH:mm'))
df.select(max("date")).show()

+---------+
|max(date)|
+---------+
|     NULL|
+---------+



In [101]:
df.select(max("InvoiceDate")).show()

+----------------+
|max(InvoiceDate)|
+----------------+
|31-10-2011 17:19|
+----------------+



In [102]:
#Calculate recency
df = df.withColumn("from_date",lit("31-10-2011 17:19"))
df.show(5)

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+----+----------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|date|       from_date|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+----+----------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01-12-2010 08:26|     2.55|     17850|United Kingdom|NULL|31-10-2011 17:19|
|   536365|    71053| WHITE METAL LANTERN|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|NULL|31-10-2011 17:19|
|   536365|   84406B|CREAM CUPID HEART...|       8|01-12-2010 08:26|     2.75|     17850|United Kingdom|NULL|31-10-2011 17:19|
|   536365|   84029G|KNITTED UNION FLA...|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|NULL|31-10-2011 17:19|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|NULL|31

In [103]:
df = df.withColumn('from_date',to_timestamp("from_date",'yy/MM/dd HH:mm'))
df = df.withColumn('from_date',to_timestamp(col('from_date'))).withColumn('recency',col("from_date").cast("long")-col('from_date').cast("long"))
df.show()

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+----+---------+-------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|date|from_date|recency|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+----+---------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01-12-2010 08:26|     2.55|     17850|United Kingdom|NULL|     NULL|   NULL|
|   536365|    71053| WHITE METAL LANTERN|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|NULL|     NULL|   NULL|
|   536365|   84406B|CREAM CUPID HEART...|       8|01-12-2010 08:26|     2.75|     17850|United Kingdom|NULL|     NULL|   NULL|
|   536365|   84029G|KNITTED UNION FLA...|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|NULL|     NULL|   NULL|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|

In [104]:
df = df.drop("date")
df = df.drop("from_date")
df = df.drop("recency")

df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [105]:
df.show()

+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|01-12-2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01-12-2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|01-12-2010 08:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|01-1

In [106]:
df =df.join(df.groupBy('CustomerID').agg(max('Quantity').alias('Quantity')),on='Quantity',how='leftsemi')
df.show()


+--------+---------+---------+--------------------+----------------+---------+----------+--------------+
|Quantity|InvoiceNo|StockCode|         Description|     InvoiceDate|UnitPrice|CustomerID|       Country|
+--------+---------+---------+--------------------+----------------+---------+----------+--------------+
|       6|   536365|   85123A|WHITE HANGING HEA...|01-12-2010 08:26|     2.55|     17850|United Kingdom|
|       6|   536365|    71053| WHITE METAL LANTERN|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|       8|   536365|   84406B|CREAM CUPID HEART...|01-12-2010 08:26|     2.75|     17850|United Kingdom|
|       6|   536365|   84029G|KNITTED UNION FLA...|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|       6|   536365|   84029E|RED WOOLLY HOTTIE...|01-12-2010 08:26|     3.39|     17850|United Kingdom|
|       2|   536365|    22752|SET 7 BABUSHKA NE...|01-12-2010 08:26|     7.65|     17850|United Kingdom|
|       6|   536365|    21730|GLASS STAR FROSTE...|01-1

In [107]:
df = df.join(df.groupBy('UnitPrice').agg(max('UnitPrice').alias('UnitPrice')), on='UnitPrice', how='leftsemi')
df.show()

+---------+--------+---------+---------+--------------------+----------------+----------+--------------+
|UnitPrice|Quantity|InvoiceNo|StockCode|         Description|     InvoiceDate|CustomerID|       Country|
+---------+--------+---------+---------+--------------------+----------------+----------+--------------+
|     2.55|       6|   536365|   85123A|WHITE HANGING HEA...|01-12-2010 08:26|     17850|United Kingdom|
|     3.39|       6|   536365|    71053| WHITE METAL LANTERN|01-12-2010 08:26|     17850|United Kingdom|
|     2.75|       8|   536365|   84406B|CREAM CUPID HEART...|01-12-2010 08:26|     17850|United Kingdom|
|     3.39|       6|   536365|   84029G|KNITTED UNION FLA...|01-12-2010 08:26|     17850|United Kingdom|
|     3.39|       6|   536365|   84029E|RED WOOLLY HOTTIE...|01-12-2010 08:26|     17850|United Kingdom|
|     7.65|       2|   536365|    22752|SET 7 BABUSHKA NE...|01-12-2010 08:26|     17850|United Kingdom|
|     4.25|       6|   536365|    21730|GLASS STAR FROS

In [108]:
df_freq = df.groupBy("CustomerID").agg(count("InvoiceDate").alias("Frequency"))
df.show()

+---------+--------+---------+---------+--------------------+----------------+----------+--------------+
|UnitPrice|Quantity|InvoiceNo|StockCode|         Description|     InvoiceDate|CustomerID|       Country|
+---------+--------+---------+---------+--------------------+----------------+----------+--------------+
|     2.55|       6|   536365|   85123A|WHITE HANGING HEA...|01-12-2010 08:26|     17850|United Kingdom|
|     3.39|       6|   536365|    71053| WHITE METAL LANTERN|01-12-2010 08:26|     17850|United Kingdom|
|     2.75|       8|   536365|   84406B|CREAM CUPID HEART...|01-12-2010 08:26|     17850|United Kingdom|
|     3.39|       6|   536365|   84029G|KNITTED UNION FLA...|01-12-2010 08:26|     17850|United Kingdom|
|     3.39|       6|   536365|   84029E|RED WOOLLY HOTTIE...|01-12-2010 08:26|     17850|United Kingdom|
|     7.65|       2|   536365|    22752|SET 7 BABUSHKA NE...|01-12-2010 08:26|     17850|United Kingdom|
|     4.25|       6|   536365|    21730|GLASS STAR FROS

In [109]:
df3 = df.join(df_freq,on="CustomerID",how="inner")
df3.show()

+----------+---------+--------+---------+---------+--------------------+----------------+--------------+---------+
|CustomerID|UnitPrice|Quantity|InvoiceNo|StockCode|         Description|     InvoiceDate|       Country|Frequency|
+----------+---------+--------+---------+---------+--------------------+----------------+--------------+---------+
|     17850|     2.55|       6|   536365|   85123A|WHITE HANGING HEA...|01-12-2010 08:26|United Kingdom|      312|
|     17850|     3.39|       6|   536365|    71053| WHITE METAL LANTERN|01-12-2010 08:26|United Kingdom|      312|
|     17850|     2.75|       8|   536365|   84406B|CREAM CUPID HEART...|01-12-2010 08:26|United Kingdom|      312|
|     17850|     3.39|       6|   536365|   84029G|KNITTED UNION FLA...|01-12-2010 08:26|United Kingdom|      312|
|     17850|     3.39|       6|   536365|   84029E|RED WOOLLY HOTTIE...|01-12-2010 08:26|United Kingdom|      312|
|     17850|     7.65|       2|   536365|    22752|SET 7 BABUSHKA NE...|01-12-20

In [110]:
m_val = df3.withColumn("total amount",col("Quantity")*col("UnitPrice"))
m_val.show()

+----------+---------+--------+---------+---------+--------------------+----------------+--------------+---------+------------------+
|CustomerID|UnitPrice|Quantity|InvoiceNo|StockCode|         Description|     InvoiceDate|       Country|Frequency|      total amount|
+----------+---------+--------+---------+---------+--------------------+----------------+--------------+---------+------------------+
|     17850|     2.55|       6|   536365|   85123A|WHITE HANGING HEA...|01-12-2010 08:26|United Kingdom|      312|15.299999999999999|
|     17850|     3.39|       6|   536365|    71053| WHITE METAL LANTERN|01-12-2010 08:26|United Kingdom|      312|             20.34|
|     17850|     2.75|       8|   536365|   84406B|CREAM CUPID HEART...|01-12-2010 08:26|United Kingdom|      312|              22.0|
|     17850|     3.39|       6|   536365|   84029G|KNITTED UNION FLA...|01-12-2010 08:26|United Kingdom|      312|             20.34|
|     17850|     3.39|       6|   536365|   84029E|RED WOOLLY 

In [111]:
m_val = m_val.groupBy("CustomerID").agg(sum("Total Amount").alias("Monetory_value"))
m_val.show()

+----------+------------------+
|CustomerID|    Monetory_value|
+----------+------------------+
|     16250|389.44000000000005|
|     15574| 702.2500000000002|
|     15555| 4710.320000000003|
|     15271|           2442.47|
|     17714|             153.0|
|     17686|           5739.46|
|     13865|501.56000000000006|
|     14157| 432.8800000000001|
|     13610|1115.4299999999996|
|     17757| 5293.540000000004|
|     17551|            306.84|
|     13187|236.01999999999995|
|     16549| 4125.270000000001|
|     12637| 5953.250000000001|
|     15052|            215.78|
|     15448|494.64000000000004|
|     13985| 6867.029999999999|
|     12888|            313.77|
|     14525|3961.2700000000004|
|     18283|2078.6299999999997|
+----------+------------------+
only showing top 20 rows



In [112]:
finaldf = m_val.join(df3,on="CustomerID",how="inner")
finaldf.show()

+----------+------------------+---------+--------+---------+---------+--------------------+----------------+--------------+---------+
|CustomerID|    Monetory_value|UnitPrice|Quantity|InvoiceNo|StockCode|         Description|     InvoiceDate|       Country|Frequency|
+----------+------------------+---------+--------+---------+---------+--------------------+----------------+--------------+---------+
|     16250|389.44000000000005|     5.95|       3|   536388|    21754|HOME BUILDING BLO...|01-12-2010 09:59|United Kingdom|       24|
|     16250|389.44000000000005|     5.95|       3|   536388|    21755|LOVE BUILDING BLO...|01-12-2010 09:59|United Kingdom|       24|
|     16250|389.44000000000005|     7.95|       2|   536388|    21523|DOORMAT FANCY FON...|01-12-2010 09:59|United Kingdom|       24|
|     16250|389.44000000000005|     4.95|       3|   536388|    21363|HOME SMALL WOOD L...|01-12-2010 09:59|United Kingdom|       24|
|     16250|389.44000000000005|     4.25|       3|   536388|  

In [115]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

finaldf = finaldf.withColumn('Quantity', col('Quantity').cast('double'))

assemble = VectorAssembler(inputCols=['Quantity', 'Monetory_value','Frequency'], outputCol='Features')
assembled_data = assemble.transform(finaldf)

scale = StandardScaler(inputCol='Features', outputCol='Standardized')
data_scale = scale.fit(assembled_data)
data_scale_output = data_scale.transform(assembled_data)
data_scale_output.show()



+----------+------------------+---------+--------+---------+---------+--------------------+----------------+--------------+---------+--------------------+--------------------+
|CustomerID|    Monetory_value|UnitPrice|Quantity|InvoiceNo|StockCode|         Description|     InvoiceDate|       Country|Frequency|            Features|        Standardized|
+----------+------------------+---------+--------+---------+---------+--------------------+----------------+--------------+---------+--------------------+--------------------+
|     16250|389.44000000000005|     5.95|     3.0|   536388|    21754|HOME BUILDING BLO...|01-12-2010 09:59|United Kingdom|       24|[3.0,389.44000000...|[0.01695604745230...|
|     16250|389.44000000000005|     5.95|     3.0|   536388|    21755|LOVE BUILDING BLO...|01-12-2010 09:59|United Kingdom|       24|[3.0,389.44000000...|[0.01695604745230...|
|     16250|389.44000000000005|     7.95|     2.0|   536388|    21523|DOORMAT FANCY FON...|01-12-2010 09:59|United Kingd