# Market Basket Analysis using PySpark's Implementation of FPGrowth

FPGrowth is an algorithm that performs market basket analysis, similar to the Apriori algorithm. I first used it when I ran into resource issues with Apriori and I was impressed with the speed. So I am giving it a try on this dataset using pyspark. The [documentation for FPGrowth](https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html) is pretty straightforward and describes the hyperparameters and the results.

## Import the relevant libraries

The libraries such as SparkContext and SparkSession are general pyspark libraries needed for pyspark applications. The specific function used for market basket analysis is [FPGrowth](https://spark.apache.org/docs/latest/ml-frequent-pattern-mining.html). 

In [1]:
 # Used for a histogram
!pip install pyspark_dist_explore


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
from pyspark import SparkContext
# Rather than generally using the functions, I should explicitly import the ones I want.
from pyspark.sql import functions as f, SparkSession, Column
from pyspark_dist_explore import hist
import matplotlib.pyplot as plt
from pyspark.ml.fpm import FPGrowth

In [3]:
# Create a spark session. All sorts of settings can be specified here. 
spark = SparkSession.builder.appName("arlUsingPyspark").getOrCreate()

25/03/16 17:19:35 WARN Utils: Your hostname, Jennie-Kims-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.137 instead (on interface en0)
25/03/16 17:19:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/16 17:19:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Read the dataset
df = spark.read.csv("basket.csv", header=True).withColumn("id", f.monotonically_increasing_id())
#df_all = spark.read.csv("/Users/admin/Jupyter Examples/Groceries data.csv", header=True).withColumn("id", f.monotonically_increasing_id())

In [5]:
# Show the dataframes
df.show(5)
#df_all.show(5)

+-----------+------------------+-------------------+------+----+----+----+----+----+----+----+---+
|          0|                 1|                  2|     3|   4|   5|   6|   7|   8|   9|  10| id|
+-----------+------------------+-------------------+------+----+----+----+----+----+----+----+---+
| whole milk|            pastry|        salty snack|  NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|  0|
|    sausage|        whole milk|semi-finished bread|yogurt|NULL|NULL|NULL|NULL|NULL|NULL|NULL|  1|
|       soda|pickled vegetables|               NULL|  NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|  2|
|canned beer|   misc. beverages|               NULL|  NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|  3|
|    sausage|  hygiene articles|               NULL|  NULL|NULL|NULL|NULL|NULL|NULL|NULL|NULL|  4|
+-----------+------------------+-------------------+------+----+----+----+----+----+----+----+---+
only showing top 5 rows



In [6]:
#num_baskets = df_all.groupBy("Member_number").count()
#num_baskets.show(5)

In [7]:
#fig, ax = plt.subplots()
#hist(ax, num_baskets.select('count'), bins = 30, color=['blue'])

# Run PySpark's implementation of FPGrowth

First step is to collect the baskets into sets. FPGrowth requires each basket to be an array that looks like:

* ['item1','item2', 'imem3']

The basket dataframe uses wide rather than long format, with Null if the basket contains fewer than 10 items. 

In [8]:
df_basket = df.select("id", f.array([df[c] for c in df.columns[:11]]).alias("basket"))
# False tells show() to not truncate the columns when printing.
df_basket.show(3, False) 

+---+--------------------------------------------------------------------------------------------+
|id |basket                                                                                      |
+---+--------------------------------------------------------------------------------------------+
|0  |[whole milk, pastry, salty snack, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL]           |
|1  |[sausage, whole milk, semi-finished bread, yogurt, NULL, NULL, NULL, NULL, NULL, NULL, NULL]|
|2  |[soda, pickled vegetables, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL]            |
+---+--------------------------------------------------------------------------------------------+
only showing top 3 rows



### There should not be any nulls in the array. Remove using array_except()

This will be the final dataframe used for FPGrowth. 

In [9]:
df_aggregated = df_basket.select("id", f.array_except("basket", f.array(f.lit(None))).alias("basket"))
df_aggregated.show(3, False)

+---+--------------------------------------------------+
|id |basket                                            |
+---+--------------------------------------------------+
|0  |[whole milk, pastry, salty snack]                 |
|1  |[sausage, whole milk, semi-finished bread, yogurt]|
|2  |[soda, pickled vegetables]                        |
+---+--------------------------------------------------+
only showing top 3 rows



## Hyperparameters

The hyperparameters used in FPGrowth are minimum support, minimum confidence, and number of partitions. 

* minSupport - The minimum support of an item to be considered in a frequent itemset. 
* minConfidence - The minimum confidence for generating an association rule from an itemset. 
* numPartitions - The number of partitions used to distribute the work. This is Spark-specific. 

The default number of partitions is the number of partitions for the input dataset. 

In [10]:
# Run FPGrowth and fit the model.
fp = FPGrowth(minSupport=0.001, minConfidence=0.001, itemsCol='basket', predictionCol='prediction')
model = fp.fit(df_aggregated)

In [11]:
# View a subset of the frequent itemset. 
model.freqItemsets.show(10, False)

+------------------------+----+
|items                   |freq|
+------------------------+----+
|[cocoa drinks]          |16  |
|[canned fruit]          |21  |
|[specialty cheese]      |72  |
|[chocolate marshmallow] |60  |
|[pet care]              |85  |
|[house keeping products]|45  |
|[jam]                   |34  |
|[light bulbs]           |29  |
|[beef]                  |508 |
|[beef, frankfurter]     |15  |
+------------------------+----+
only showing top 10 rows



In [12]:
# Use filter to view just the association rules with the highest confidence.
model.associationRules.filter(model.associationRules.confidence>0.15).show(20, False)

+---------------------+------------+-------------------+------------------+---------------------+
|antecedent           |consequent  |confidence         |lift              |support              |
+---------------------+------------+-------------------+------------------+---------------------+
|[bottled beer]       |[whole milk]|0.15781710914454278|0.9993302598941151|0.007150972398583172 |
|[detergent]          |[whole milk]|0.16279069767441862|1.030824041177455 |0.001403461872619127 |
|[semi-finished bread]|[whole milk]|0.176056338028169  |1.1148247930239072|0.001670787943594199 |
|[sausage, rolls/buns]|[whole milk]|0.2125             |1.345593525179856 |0.0011361358016440553|
|[sausage, soda]      |[whole milk]|0.1797752808988764 |1.1383739010113787|0.0010693042839002875|
|[ham]                |[whole milk]|0.16015625         |1.0141421789039358|0.0027400922274944863|
|[frozen fish]        |[whole milk]|0.1568627450980392 |0.9932870312746344|0.0010693042839002875|
|[sausage, whole mil

## Let's create a prediction based on the generated association rules

This is pretty similar to creating a prediction using other methods. The data column needs to have the same column name as the column specified in the model fit.

In [13]:
# Create a PySpark dataframe
columns = ['basket']
new_data = [(['ham', 'yogurt', 'light bulbs'],), (['jam', 'cocoa drinks', 'pet care'],)]
rdd = spark.sparkContext.parallelize(new_data)
new_df = rdd.toDF(columns)
new_df.show(2,False)

+-----------------------------+
|basket                       |
+-----------------------------+
|[ham, yogurt, light bulbs]   |
|[jam, cocoa drinks, pet care]|
+-----------------------------+



# Predict!

Now that we have a new PySpark dataframe with data, predict. The first basket generates numerous predictions based on the association rules, however the second basket does not generate any. 

In [14]:
model.transform(new_df).show(5, False)

+-----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|basket                       |prediction                                                                                                                                                                                                                                                                                                             

# 