<img src="https://miro.medium.com/max/1218/1*4-c4LZRDJVFXBzWiRpaK4A.png" width = 400 height = 250 />


In [1]:
## https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html

In [1]:
import warnings 
warnings.filterwarnings("ignore") 
import pandas as pd 

import os
import numpy as np
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.fpm import FPGrowth

In [4]:
# SparkSession.stop(spark)
spark = SparkSession.builder.master("local[*]")\
        .appName('fpgrowth')\
        .config("spark.executor.memory", '8G')\
        .config("spark.driver.memory", '8G')\
        .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled","true")
spark

In [6]:
## Loading data to spark dataframe from csv
df = (spark.read.format("csv").options(header = "true", delimiter = ';').load(os.path.join(os.path.abspath(''), 'data\\transactional_data.csv')))

df.printSchema()
print('Row count:', f'{df.count():,}')
print('Distinct transactions:', f'{df.select(F.countDistinct("transaction")).collect()[0][0]:,}')
print('Distinct items:', f'{df.select(F.countDistinct("item")).collect()[0][0]:,}')

df.show(n = 10)

root
 |-- transaction: string (nullable = true)
 |-- item: string (nullable = true)

Row count: 1,048,575
Distinct transactions: 43,559
Distinct items: 2,867
+-----------+----------+
|transaction|      item|
+-----------+----------+
|10007638857|3708020202|
|10007638857|2306030101|
|10007638857|2207020101|
|10007638857|2506040101|
|21018312942|1801040202|
|21018312942|2205040102|
|21018312942|3602060101|
|21018312942|1801050301|
|21018312968|2702010101|
|21018312968|1501070201|
+-----------+----------+
only showing top 10 rows



In [7]:
## Dropping 'bad' items
df = df.filter(~df.item.isin(['1505030101', '5801010101']))

In [8]:
## Dropping duplicates and grouping dataframe
basketdata = df.dropDuplicates(['transaction', 'item'])
basketdata = basketdata.groupBy("transaction").agg(F.collect_list("item")).sort('transaction')
basketdata.printSchema()

root
 |-- transaction: string (nullable = true)
 |-- collect_list(item): array (nullable = false)
 |    |-- element: string (containsNull = false)



In [9]:
## Frequent Pattern Growth – FP Growth is a method of mining frequent itemsets using support, lift, and confidence.
model = FPGrowth(itemsCol = "collect_list(item)", minSupport = 0.01, minConfidence = 0.01).fit(basketdata)
# Display frequent itemsets.
model.freqItemsets.show(n = 10)

+--------------------+----+
|               items|freq|
+--------------------+----+
|        [3407010101]|3554|
|        [3407010201]|3080|
|        [1801050301]|2691|
|        [3408020101]|2554|
|[3408020101, 3407...| 492|
|        [4302030101]|2464|
|        [3407010401]|2132|
|        [1501070201]|2032|
|        [1801050103]|1994|
|[1801050103, 1801...| 486|
+--------------------+----+
only showing top 10 rows



Model metrics:
<br><br>

|Metric|Formula|Range|Description|
|------|------|------|------|
|Support|support(A+C)|[0, 1]|Support is used to measure frequency of an itemset in the database|
|Confidence|support(A+C) / support(A)|[0, 1]|Confidence is the probability of seeing the consequent in the transaction given that is also contains the antecedent|
|Lift|confidence(A->C) / support(C)|[0, inf]|Lift shows how much more often the antecedent and consequent of a rule A->C occur together than we would expect if they were statistically independent|
|Leverage|support(A->C) - support(A) * support(C)|[-1, 1]|Leverage computes the difference between the observed frequency of A and C appearing togheter and the frequency that would be expected if A and C were independent|
|Conviction|[1 - support(C)] / [1 - confidence(A->C)]|[0, inf]|High conviction value means that the consequent is highly depending of the antecedent|

In [19]:
# Display generated association rules.
ar = model.associationRules.where(F.size(F.col('antecedent')) == 1).where(F.size(F.col('consequent')) == 1).where(F.col('lift') > 1)
print('Rule count:', f'{ar.count():,}')
ar.sort('lift', ascending = False).show(n = 10)

Rule count: 102
+------------+------------+-------------------+------------------+--------------------+
|  antecedent|  consequent|         confidence|              lift|             support|
+------------+------------+-------------------+------------------+--------------------+
|[5001010502]|[5001010103]|  0.649402390438247| 35.67127203669559| 0.01496820404508827|
|[5001010103]|[5001010502]| 0.8221941992433796| 35.67127203669558| 0.01496820404508827|
|[3902020201]|[3902020101]|0.49718045112781956|33.733151511957466|0.012144447760508735|
|[3902020101]|[3902020201]|   0.82398753894081|33.733151511957466|0.012144447760508735|
|[5001010501]|[5001010102]|  0.736346516007533|  32.1710309837233|   0.017952661906839|
|[5001010102]|[5001010501]| 0.7843530591775326|  32.1710309837233|   0.017952661906839|
|[2103010501]|[2103010201]| 0.5335820895522388|27.969076099646173| 0.01641451823962901|
|[2103010201]|[2103010501]| 0.8604091456077015|27.969076099646173| 0.01641451823962901|
|[5001070101]|[5

In [20]:
## Analyze single item
ar.filter(F.exists(F.col("antecedent"), lambda x: x.startswith('5001060101'))).sort('confidence', ascending = False).show(n = 10)

+------------+------------+-------------------+------------------+--------------------+
|  antecedent|  consequent|         confidence|              lift|             support|
+------------+------------+-------------------+------------------+--------------------+
|[5001060101]|[5005010101]| 0.6526655896607432|15.620582648369403|0.018549553479189147|
|[5001060101]|[5006060101]| 0.4555735056542811|19.804716898996837|  0.0129479556463647|
|[5001060101]|[5004050101]|0.43618739903069464|16.858817137868705|0.012396978810349181|
|[5001060101]|[5004050201]| 0.4305331179321486|18.697499585250707| 0.01223627723317799|
|[5001060101]|[5004020101]| 0.4135702746365105|11.120189872155409| 0.01175417250166441|
|[5001060101]|[5001010501]| 0.4111470113085622|16.863608912984613|0.011685300397162469|
|[5001060101]|[5001010102]| 0.3764135702746365|16.445535313533494|0.010698133565967997|
|[5001060101]|[5001070101]| 0.3756058158319871|27.497502070295003|0.010675176197800684|
|[5001060101]|[5004050401]| 0.35

In [21]:
# transform examines the input items against all the association rules and summarize the consequents as prediction
transformed = model.transform(basketdata)
transformed = transformed.withColumn('transaction', df.transaction.cast(DecimalType(18, 0)))
transformed.show(n = 5)

+------------+--------------------+----------+
| transaction|  collect_list(item)|prediction|
+------------+--------------------+----------+
|100026000000|[5001010101, 4402...|        []|
|110004000000|[5005010101, 5001...|        []|
|110005000000|[5001010501, 5006...|        []|
|120005000000|[4902080101, 5103...|        []|
|130002000000|[3201020101, 1302...|        []|
+------------+--------------------+----------+
only showing top 5 rows



In [22]:
## Transforming spark dataframe to pandas
rules_df = ar.select("*").toPandas()
rules_df.head(5)

Unnamed: 0,antecedent,consequent,confidence,lift,support
0,[1801050202],[1801050205],0.490134,13.385412,0.017677
1,[1801050202],[1801050301],0.347549,5.625753,0.012535
2,[5001060101],[5005010101],0.652666,15.620583,0.01855
3,[5001060101],[5004020101],0.41357,11.12019,0.011754
4,[5001060101],[5004050101],0.436187,16.858817,0.012397
