# Dataset

E-Commerce Data from https://www.kaggle.com/carrie1/ecommerce-data

Actual Transactions from UK Retailer

# 1. Spark Intialization

In [5]:
import findspark
findspark.init()

In [6]:
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession \
    .builder \
    .appName("Tugas Spark Frequent Itemsets") \
    .getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x000001E0E92B9630>


In [7]:
df = spark.read.csv("C://Users//asus//Documents//Datasets//data.csv", header="true")

In [8]:
df.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [9]:
df.schema

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

In [6]:
#Select column to be train, thus i use the Invoice No. & Stock Code
df = df.selectExpr(['InvoiceNo as ID', 'StockCode as Items'])

In [10]:
#Select column to be saved to check the items
df_check = df.selectExpr(['InvoiceNo as ID', 'StockCode as Items', 'Description'])

In [7]:
df.show()

+------+------+
|    ID| Items|
+------+------+
|536365|85123A|
|536365| 71053|
|536365|84406B|
|536365|84029G|
|536365|84029E|
|536365| 22752|
|536365| 21730|
|536366| 22633|
|536366| 22632|
|536367| 84879|
|536367| 22745|
|536367| 22748|
|536367| 22749|
|536367| 22310|
|536367| 84969|
|536367| 22623|
|536367| 22622|
|536367| 21754|
|536367| 21755|
|536367| 21777|
+------+------+
only showing top 20 rows



# 2. Data Grouping & Remove Duplicates

In [8]:
#Group the Data according to the Invoice No.
from pyspark.sql.functions import collect_list
df_group = df.groupby("ID").agg(collect_list('Items').alias('Items'))

In [9]:
df_group.show()

+------+--------------------+
|    ID|               Items|
+------+--------------------+
|536596|[21624, 22900, 22...|
|536938|[22386, 85099C, 2...|
|537252|             [22197]|
|537691|[22791, 22171, 82...|
|538041|             [22145]|
|538184|[22585, 21481, 22...|
|538517|[22491, 21232, 21...|
|538879|[84819, 22150, 21...|
|539275|[22909, 22423, 22...|
|539630|[21484, 85099B, 2...|
|540499|[21868, 22697, 22...|
|540540|[21877, 21868, 21...|
|540976|[22394, 21890, 22...|
|541432|[21485, 22457, 84...|
|541518|[21880, 21881, 21...|
|541783|[22423, 22854, 22...|
|542026|[21754, 82600, 22...|
|542375|[21731, 22367, 22...|
|543641|[85123A, 21833, 2...|
|544303|[22660, 48138, 48...|
+------+--------------------+
only showing top 20 rows



In [10]:
#Remove duplicates data using udf function 
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

distinct_items= udf(lambda row: list(set(row)), ArrayType(StringType()))
df_group = df_group.withColumn("distinct_items", distinct_items("Items"))

In [11]:
df_group.show()

+------+--------------------+--------------------+
|    ID|               Items|      distinct_items|
+------+--------------------+--------------------+
|536596|[21624, 22900, 22...|[84926A, 21624, 2...|
|536938|[22386, 85099C, 2...|[21479, 84997B, 2...|
|537252|             [22197]|             [22197]|
|537691|[22791, 22171, 82...|[22505, 22791, 82...|
|538041|             [22145]|             [22145]|
|538184|[22585, 21481, 22...|[22492, 22561, 48...|
|538517|[22491, 21232, 21...|[22197, 22844, 22...|
|538879|[84819, 22150, 21...|[22130, 22555, 84...|
|539275|[22909, 22423, 22...|[22423, 21914, 22...|
|539630|[21484, 85099B, 2...|[22988, 84347, 22...|
|540499|[21868, 22697, 22...|[21755, 84978, 22...|
|540540|[21877, 21868, 21...|[22555, 22551, 22...|
|540976|[22394, 21890, 22...|[22207, 21110, 84...|
|541432|[21485, 22457, 84...|[22113, 22457, 21...|
|541518|[21880, 21881, 21...|[20724, 21982, 20...|
|541783|[22423, 22854, 22...|[22197, 84978, 22...|
|542026|[21754, 82600, 22...|[2

# 3. FP Growth Algorithm Training

In [12]:
from pyspark.ml.fpm import FPGrowth

Counting the optimum amount of Support (quora.com) by determining how frequent the items being picked. I use the frequency of Items picked of 3 times in a day and records for data of 2 years is 541910.
(3*730)/541910 = 0,04

First i use a bigger support and confidence of support = 0.4 and confidence = 0.8

In [13]:
fpGrowth = FPGrowth(itemsCol="distinct_items", minSupport=0.4, minConfidence=0.8)
model=fpGrowth.fit(df_group)

In [14]:
model.freqItemsets.show()

+-----+----+
|items|freq|
+-----+----+
+-----+----+



In [15]:
model.associationRules.show()

+----------+----------+----------+
|antecedent|consequent|confidence|
+----------+----------+----------+
+----------+----------+----------+



The results are none, so i use the optimum amount of support = 0.04 and confidence 0.04

In [16]:
fpGrowth2 = FPGrowth(itemsCol="distinct_items", minSupport=0.04, minConfidence=0.04)
model2=fpGrowth2.fit(df_group)

In [17]:
model2.freqItemsets.show()

+--------+----+
|   items|freq|
+--------+----+
|[85123A]|2246|
| [22423]|2172|
|[85099B]|2135|
| [47566]|1706|
| [20725]|1608|
| [84879]|1468|
| [22720]|1462|
| [22197]|1442|
| [21212]|1334|
| [22383]|1306|
| [20727]|1295|
| [22457]|1266|
|  [POST]|1254|
| [23203]|1249|
| [22386]|1231|
| [22960]|1220|
| [22469]|1214|
| [21931]|1201|
| [22411]|1187|
| [22961]|1174|
+--------+----+
only showing top 20 rows



In [18]:
model2.associationRules.show()

+----------+----------+----------+
|antecedent|consequent|confidence|
+----------+----------+----------+
+----------+----------+----------+



Apparently, there is still no available predictions, so i use a lower ammount of support & confidence by cutting it into half.

In [19]:
fpGrowth3 = FPGrowth(itemsCol="distinct_items", minSupport=0.02, minConfidence=0.02)
model3=fpGrowth3.fit(df_group)

In [20]:
model3.freqItemsets.show()

+---------------+----+
|          items|freq|
+---------------+----+
|       [85123A]|2246|
|        [22423]|2172|
|       [85099B]|2135|
|        [47566]|1706|
|        [20725]|1608|
|[20725, 85099B]| 588|
|        [84879]|1468|
|        [22720]|1462|
|        [22197]|1442|
|        [21212]|1334|
|        [22383]|1306|
| [22383, 20725]| 663|
|        [20727]|1295|
| [20727, 20725]| 648|
| [20727, 22383]| 587|
|        [22457]|1266|
|         [POST]|1254|
|        [23203]|1249|
|[23203, 85099B]| 582|
|        [22386]|1231|
+---------------+----+
only showing top 20 rows



In [21]:
model3.associationRules.show()

+----------+----------+-------------------+
|antecedent|consequent|         confidence|
+----------+----------+-------------------+
|   [22699]|   [22423]|0.47946428571428573|
|   [22699]|   [22697]|                0.7|
|   [22699]|   [22698]| 0.5482142857142858|
|   [22386]|  [85099B]| 0.6766856214459789|
|   [22386]|   [21931]| 0.4207961007311129|
|   [20727]|   [20725]| 0.5003861003861004|
|   [20727]|   [22383]| 0.4532818532818533|
|   [20727]|   [20728]| 0.4061776061776062|
|   [20727]|   [22384]| 0.4223938223938224|
|   [22382]|   [20725]| 0.4811965811965812|
|   [22382]|   [22383]|0.45897435897435895|
|   [20725]|  [85099B]| 0.3656716417910448|
|   [20725]|   [22383]| 0.4123134328358209|
|   [20725]|   [20727]|0.40298507462686567|
|   [20725]|   [20728]|0.34950248756218905|
|   [20725]|   [22382]|0.35012437810945274|
|   [20725]|   [22384]| 0.3812189054726368|
|   [20725]|   [20726]| 0.3308457711442786|
|   [22384]|   [20725]| 0.5522522522522523|
|   [22384]|   [20727]| 0.492792

In [22]:
#Transform the association rules into predictions
model3.transform(df_group).show()

+------+--------------------+--------------------+--------------------+
|    ID|               Items|      distinct_items|          prediction|
+------+--------------------+--------------------+--------------------+
|536596|[21624, 22900, 22...|[84926A, 21624, 2...|                  []|
|536938|[22386, 85099C, 2...|[21479, 84997B, 2...|     [85099B, 22411]|
|537252|             [22197]|             [22197]|                  []|
|537691|[22791, 22171, 82...|[22505, 22791, 82...|                  []|
|538041|             [22145]|             [22145]|                  []|
|538184|[22585, 21481, 22...|[22492, 22561, 48...|                  []|
|538517|[22491, 21232, 21...|[22197, 22844, 22...|                  []|
|538879|[84819, 22150, 21...|[22130, 22555, 84...|                  []|
|539275|[22909, 22423, 22...|[22423, 21914, 22...|      [22699, 22697]|
|539630|[21484, 85099B, 2...|[22988, 84347, 22...|[20725, 23203, 22...|
|540499|[21868, 22697, 22...|[21755, 84978, 22...|      [22698, 

In [43]:
#Get 1 example of items and put it into Data Frame
df_compare=spark.createDataFrame([
    ('0',['22423'])
],['id','distinct_items'])

In [44]:
df_compare.show()

+---+--------------+
| id|distinct_items|
+---+--------------+
|  0|       [22423]|
+---+--------------+



In [45]:
#We need to find the closest prediction for item 22423 and we get 22699 & 22697
model3.transform(df_compare).show()

+---+--------------+--------------+
| id|distinct_items|    prediction|
+---+--------------+--------------+
|  0|       [22423]|[22699, 22697]|
+---+--------------+--------------+



In [11]:
df_check.show()

+------+------+--------------------+
|    ID| Items|         Description|
+------+------+--------------------+
|536365|85123A|WHITE HANGING HEA...|
|536365| 71053| WHITE METAL LANTERN|
|536365|84406B|CREAM CUPID HEART...|
|536365|84029G|KNITTED UNION FLA...|
|536365|84029E|RED WOOLLY HOTTIE...|
|536365| 22752|SET 7 BABUSHKA NE...|
|536365| 21730|GLASS STAR FROSTE...|
|536366| 22633|HAND WARMER UNION...|
|536366| 22632|HAND WARMER RED P...|
|536367| 84879|ASSORTED COLOUR B...|
|536367| 22745|POPPY'S PLAYHOUSE...|
|536367| 22748|POPPY'S PLAYHOUSE...|
|536367| 22749|FELTCRAFT PRINCES...|
|536367| 22310|IVORY KNITTED MUG...|
|536367| 84969|BOX OF 6 ASSORTED...|
|536367| 22623|BOX OF VINTAGE JI...|
|536367| 22622|BOX OF VINTAGE AL...|
|536367| 21754|HOME BUILDING BLO...|
|536367| 21755|LOVE BUILDING BLO...|
|536367| 21777|RECIPE BOX WITH M...|
+------+------+--------------------+
only showing top 20 rows



# 4. Final Analysis

To check the Description of the Items, i use the SQL Query to find the Description.

In [12]:
df_check.createOrReplaceTempView("ecommerce")

In [21]:
#Find the item with StockCode of '22423'
query1=spark.sql("SELECT DISTINCT Description FROM ecommerce WHERE Items = '22423'")
query1.show()

+--------------------+
|         Description|
+--------------------+
|REGENCY CAKESTAND...|
|             damages|
|              faulty|
+--------------------+



In [17]:
#Find the item with StockCode of '22699'
query2=spark.sql("SELECT DISTINCT Description FROM ecommerce WHERE Items = '22699'")
query2.show()

+--------------------+
|         Description|
+--------------------+
|ROSES REGENCY TEA...|
+--------------------+



In [18]:
#Find the item with StockCode of '22697'
query3=spark.sql("SELECT DISTINCT Description FROM ecommerce WHERE Items = '22697'")
query3.show()

+--------------------+
|         Description|
+--------------------+
|GREEN REGENCY TEA...|
+--------------------+



# Final Result

The results are when people buy Regency Cakestand they are most likely also to buy Roses Regency Tea and/or Green Regency Tea. 

# Reference

1. Remove Multiple Items : https://stackoverflow.com/questions/54185710/remove-duplicates-from-pyspark-array-column
2. Counting Optimum Ammount of Support : https://www.quora.com/How-do-I-pick-appropriate-support-confidence-value-when-doing-basket-analysis-with-Apriori-algorithm
       