# Implementing Apriori Algorithm

Import Libraries

In [3]:
##Importing Libraries
import re
import warnings
import pandas as pd
import json
import numpy as np
#import pyfpgrowth
from efficient_apriori import apriori
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import udf
import pyspark.sql.functions as func
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.fpm import FPGrowth
import datetime
import time

Create Spark Session

In [4]:
sc = SparkContext.getOrCreate()
spark1 = SparkSession(sc)

Load Dataset

In [5]:
df = spark1.read.csv("BigTempRb.txt", sep=',',header='true', inferSchema='true',escape="\"" )
#df.dtypes

Select columns and check row count

In [6]:
df = df.select("RequestTimestamp","ResponseRgBasketId","RequestSiteId",
      "RequestBasketValue","RequestBasketId",
      "RequestNumberBasketItems","RequestBasketJsonString")

df.count()

10020

Create ItemList column

In [7]:
### UDF to extract item list from json
def get_item_list(item_json):
    #print(item_json)
    response = json.loads(item_json)
    lister = []
    for nest in response['items']:
        lister.append(nest["b"])
    return (list(set(lister)))

### converting above function to a UDF
item_list = udf(get_item_list, ArrayType(StringType()))

## Creating item_list from JsonString
df = df.withColumn('ItemList', item_list(df.RequestBasketJsonString))
#df.show()

#load ItemList to transactions list
transactions = df.select("ItemList").rdd.flatMap(lambda x: x).collect()

Build Apriori

In [8]:
## input paramters
input_support = .0002 ## VALUE RANGES BETWEEN 0 AND 1
input_confidence = 0.001 ## VALUE RANGES BETWEEN 0 AND 1
############### Applying Pyspark FPgrowth algorithm ######################
#fpGrowth = FPGrowth(itemsCol="ItemList", minSupport=input_support, minConfidence=input_confidence)
#model = fpGrowth.fit(df)
itemsets, rules = apriori(transactions, min_support=input_support, min_confidence=input_confidence)


Print rules for validation purpose

In [9]:
# Print out every rule with 1 items on the left hand side,
# 1 item on the right hand side, sorted by lift
rules_rhs = filter(lambda rule: len(rule.lhs) == 1 and len(rule.rhs) == 1, rules)
for rule in sorted(rules_rhs, key=lambda rule: rule.lift):
  print(rule)  # Prints the rule and its confidence, support, lift, ...
    


# Print out every rule with 2 items on the left hand side,
# 1 item on the right hand side, sorted by lift
rules_rhs = filter(lambda rule: len(rule.lhs) == 2 and len(rule.rhs) == 1, rules)
for rule in sorted(rules_rhs, key=lambda rule: rule.lift): 
  print(rule)  # Prints the rule and its confidence, support, lift, ...

{0000000000523} -> {0000000000145} (conf: 0.024, supp: 0.000, lift: 0.340, conv: 0.952)
{0000000000145} -> {0000000000523} (conf: 0.004, supp: 0.000, lift: 0.340, conv: 0.992)
{6100000000193} -> {0000000000145} (conf: 0.026, supp: 0.000, lift: 0.372, conv: 0.954)
{0000000000145} -> {6100000000193} (conf: 0.004, supp: 0.000, lift: 0.372, conv: 0.993)
{9770956805424} -> {0000000000145} (conf: 0.032, supp: 0.000, lift: 0.457, conv: 0.960)
{0000000000145} -> {9770956805424} (conf: 0.004, supp: 0.000, lift: 0.457, conv: 0.995)
{9770307268922} -> {5000128104524} (conf: 0.013, supp: 0.000, lift: 0.463, conv: 0.985)
{5000128104524} -> {9770307268922} (conf: 0.011, supp: 0.000, lift: 0.463, conv: 0.987)
{2083803000000} -> {0000000007535} (conf: 0.010, supp: 0.000, lift: 0.480, conv: 0.989)
{0000000007535} -> {2083803000000} (conf: 0.014, supp: 0.000, lift: 0.480, conv: 0.985)
{9770307757129} -> {0000000000145} (conf: 0.034, supp: 0.001, lift: 0.488, conv: 0.963)
{0000000000145} -> {977030775712

{0000000007535} -> {5000128412773} (conf: 0.014, supp: 0.000, lift: 4.032, conv: 1.011)
{7622210286956} -> {5000128861069} (conf: 0.039, supp: 0.000, lift: 4.036, conv: 1.031)
{5000128861069} -> {7622210286956} (conf: 0.031, supp: 0.000, lift: 4.036, conv: 1.024)
{5000128670210} -> {0000000000145} (conf: 0.286, supp: 0.000, lift: 4.044, conv: 1.301)
{0000000000145} -> {5000128670210} (conf: 0.006, supp: 0.000, lift: 4.044, conv: 1.004)
{5000221506096} -> {0000000000145} (conf: 0.286, supp: 0.000, lift: 4.044, conv: 1.301)
{0000000000145} -> {5000221506096} (conf: 0.006, supp: 0.000, lift: 4.044, conv: 1.004)
{5000193034559} -> {0000000000145} (conf: 0.286, supp: 0.001, lift: 4.044, conv: 1.301)
{0000000000145} -> {5000193034559} (conf: 0.008, supp: 0.001, lift: 4.044, conv: 1.006)
{5000128782357} -> {0000000000145} (conf: 0.286, supp: 0.000, lift: 4.044, conv: 1.301)
{0000000000145} -> {5000128782357} (conf: 0.006, supp: 0.000, lift: 4.044, conv: 1.004)
{5000382102144} -> {000000000014

{5000159455343} -> {5000159484527} (conf: 0.083, supp: 0.000, lift: 20.366, conv: 1.086)
{5000128540155} -> {5000128267540} (conf: 0.129, supp: 0.000, lift: 20.522, conv: 1.141)
{5000128267540} -> {5000128540155} (conf: 0.063, supp: 0.000, lift: 20.522, conv: 1.064)
{5000373702315} -> {5000128104517} (conf: 0.231, supp: 0.000, lift: 20.646, conv: 1.285)
{5000128104517} -> {5000373702315} (conf: 0.027, supp: 0.000, lift: 20.646, conv: 1.026)
{5000159484527} -> {5000159454452} (conf: 0.098, supp: 0.000, lift: 20.799, conv: 1.103)
{5000159454452} -> {5000159484527} (conf: 0.085, supp: 0.000, lift: 20.799, conv: 1.089)
{5000128927994} -> {0000000007535} (conf: 0.444, supp: 0.000, lift: 20.908, conv: 1.762)
{0000000007535} -> {5000128927994} (conf: 0.019, supp: 0.000, lift: 20.908, conv: 1.018)
{5000128966429} -> {5000128947589} (conf: 0.073, supp: 0.000, lift: 20.948, conv: 1.075)
{5000128947589} -> {5000128966429} (conf: 0.086, supp: 0.000, lift: 20.948, conv: 1.089)
{5000128808286} -> {5