# Implementing FP Growth using Spark Dataframe

Importing Libraries

In [1]:
##Importing Libraries
import re
import warnings
import pandas as pd
import json
import numpy as np
import pyfpgrowth
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import udf
import pyspark.sql.functions as func
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.fpm import FPGrowth
#import spark
import time

Create Spark Session

In [2]:
sc = SparkContext.getOrCreate()
spark1 = SparkSession(sc)

Load Dataset

In [3]:
df = spark1.read.csv("BigTempRb.txt", sep=',',header='true', inferSchema='true',escape="\"" )
#df.dtypes

Select columns and check row count

In [4]:
df = df.select("RequestTimestamp","ResponseRgBasketId","RequestSiteId",
      "RequestBasketValue","RequestBasketId",
      "RequestNumberBasketItems","RequestBasketJsonString")
#df.show()
df.count()

10020

Create ItemList column

In [5]:
### UDF to extract item list from json
def get_item_list(item_json):
    #print(item_json)
    response = json.loads(item_json)
    lister = []
    for nest in response['items']:
        lister.append(nest["b"])
    return (list(set(lister)))

### converting above function to a UDF
item_list = udf(get_item_list, ArrayType(StringType()))

## Creating item_list from JsonString
df = df.withColumn('ItemList', item_list(df.RequestBasketJsonString))

In [6]:
df.show(10) ## Display first 10 entries of PySpark Dataframe

+--------------------+--------------------+-------------+------------------+---------------+------------------------+-----------------------+--------------------+
|    RequestTimestamp|  ResponseRgBasketId|RequestSiteId|RequestBasketValue|RequestBasketId|RequestNumberBasketItems|RequestBasketJsonString|            ItemList|
+--------------------+--------------------+-------------+------------------+---------------+------------------------+-----------------------+--------------------+
|2018-11-27 14:13:...|cf31c58d05ca41bda...|         7333|              14.6|    73335271658|                       5|   {"id":"7333527165...|[5000128725477, 5...|
|2018-11-27 16:35:...|1d385ce056e1463a8...|         5775|              2.54|   577522888398|                       3|   {"id":"5775228883...|[5010891011868, 8...|
|2018-11-27 22:34:...|b9083214cf6c4593b...|         7392|              6.35|     7392753009|                       4|   {"id":"7392753009...|[5000128797436, 5...|
|2018-11-27 11:52:...|

In [7]:
len(df.rdd.map(lambda x: x.ItemList).collect())

10020

Build FPGrwoth

In [8]:
## input paramters
input_support = .0002 ## VALUE RANGES BETWEEN 0 AND 1
input_confidence = 0.001 ## VALUE RANGES BETWEEN 0 AND 1

############### Applying Pyspark FPgrowth algorithm ######################
start = time.time()
fpGrowth = FPGrowth(itemsCol="ItemList", minSupport=input_support, minConfidence=input_confidence)
model = fpGrowth.fit(df)
model_time = time.time() - start

In [9]:
model_time

2.3794689178466797

In [10]:
# Display frequent itemsets.
model.freqItemsets.show()
model.freqItemsets.count()

+--------------------+----+
|               items|freq|
+--------------------+----+
|     [5000128701730]|   3|
|     [0000050939886]|   3|
|     [3046920029759]|   4|
|     [0000050248346]|   4|
|     [5010043000016]|  15|
|     [5010455063142]|  34|
|[5010455063142, 7...|   3|
|[5010455063142, 5...|   3|
|     [5000128769433]|  10|
|     [5054267000704]|  28|
|[5054267000704, 5...|   3|
|     [5000128610070]|  14|
|     [5000128651691]|   5|
|     [5012035955892]|   3|
|     [5000128193610]|  11|
|     [5000128711449]|  42|
|[5000128711449, 5...|   4|
|[5000128711449, 0...|   3|
|     [5010394986076]|   6|
|     [5000128867849]|  10|
+--------------------+----+
only showing top 20 rows



4837

In [11]:
# Generating association rules.
rules = model.associationRules
rules = rules.withColumn("confidence", func.round(rules["confidence"], 4)) ## Rounding values to 4 decimals for confidence
rules = rules.withColumn("lift", func.round(rules["lift"], 2)) ## Rounding values to 2 decimal for lift
rules.show()
rules.count()

+---------------+---------------+----------+-------+
|     antecedent|     consequent|confidence|   lift|
+---------------+---------------+----------+-------+
|[5000128597685]|[5000128597623]|    0.3333|  133.6|
|[5000128833813]|[5000128664288]|     0.125|  78.28|
|[5000128833813]|[5000128655996]|     0.125|  32.96|
|[5000128833813]|[5000128785617]|     0.125|   2.39|
|[5000128833813]|[0000000000145]|     0.125|   1.77|
|[5000328806181]|[0000000000145]|    0.3333|   4.72|
|[5000128766708]|[5000128766586]|    0.0625|  18.98|
|[5000128766708]|[5000128072915]|    0.0469|   7.96|
|[5000128766708]|[5000128785617]|    0.0625|   1.19|
|[5000128766708]|[0000000007535]|    0.0469|   2.21|
|[5000128766708]|[5010092093441]|    0.0469|  13.05|
|[0000096135846]|[6191514300312]|       0.2|   8.38|
|[5023528000036]|[0000000000145]|    0.3333|   4.72|
|[5000128979726]|[0000000000145]|    0.2308|   3.27|
|[5000128889889]|[5010228013213]|    0.3333| 556.67|
|[5000128889889]|[5000205039862]|    0.3333|11

3661

In [14]:
#rules.collect() ## Checking the entire generated rules.  Do not execute this code if the data is big, so commenting out.

[Row(antecedent=['5000128597685'], consequent=['5000128597623'], confidence=0.3333, lift=133.6),
 Row(antecedent=['5000128833813'], consequent=['5000128664288'], confidence=0.125, lift=78.28),
 Row(antecedent=['5000128833813'], consequent=['5000128655996'], confidence=0.125, lift=32.96),
 Row(antecedent=['5000128833813'], consequent=['5000128785617'], confidence=0.125, lift=2.39),
 Row(antecedent=['5000128833813'], consequent=['0000000000145'], confidence=0.125, lift=1.77),
 Row(antecedent=['5000328806181'], consequent=['0000000000145'], confidence=0.3333, lift=4.72),
 Row(antecedent=['5000128766708'], consequent=['5000128766586'], confidence=0.0625, lift=18.98),
 Row(antecedent=['5000128766708'], consequent=['5000128072915'], confidence=0.0469, lift=7.96),
 Row(antecedent=['5000128766708'], consequent=['5000128785617'], confidence=0.0625, lift=1.19),
 Row(antecedent=['5000128766708'], consequent=['0000000007535'], confidence=0.0469, lift=2.21),
 Row(antecedent=['5000128766708'], conse

In [15]:
model.transform(df).select('ResponseRgBasketId','ItemList','prediction').show()

+--------------------+--------------------+--------------------+
|  ResponseRgBasketId|            ItemList|          prediction|
+--------------------+--------------------+--------------------+
|cf31c58d05ca41bda...|[5000128725477, 5...|[5000128104494, 0...|
|1d385ce056e1463a8...|[5010891011868, 8...|                  []|
|b9083214cf6c4593b...|[5000128797436, 5...|[0000000000145, 5...|
|7da158b1c1d14fdf8...|[4062300011847, 5...|     [5000128785617]|
|ce2c0ef55f4c47d1b...|[6191514300312, 5...|[0000000000145, 5...|
|23af5a43bb3043b3a...|[9770307268922, 5...|[5000128766968, 9...|
|e9e24506d4c64b5ba...|     [7622210286314]|     [5000159457873]|
|43a4bfda05d44538b...|[5000128677127, 7...|                  []|
|6ce95a109c024ce9a...|     [0000050457243]|[5000128785617, 0...|
|cebc44a6e43d4c9f9...|     [5000431027169]|                  []|
|e6aa26eb00c947389...|     [5000128104517]|[5010044000305, 5...|
|3f64f54e8dcc42758...|[5037435002335, 7...|[0000000000145, 5...|
|3b898aa2b03f4afb8...|   

Create single rule and double rule dataframe

In [12]:
from pyspark.sql.functions import col, size
single_rule_df = rules.where(size(col('antecedent')) == 1).where(size(col('consequent')) == 1) ## extracting only single rules
double_rule_df = rules.where(size(col('antecedent')) == 2).where(size(col('consequent')) == 1) ## extracting only double rules

In [13]:
double_rule_df.show()

+--------------------+---------------+----------+-------+
|          antecedent|     consequent|confidence|   lift|
+--------------------+---------------+----------+-------+
|[5000128663250, 0...|[5000128661911]|      0.75| 289.04|
|[2083801000002, 2...|[6100000000193]|    0.0833|   7.32|
|[2083801000002, 2...|[9770307268922]|    0.0833|   3.61|
|[2083801000002, 2...|[5000128104524]|    0.0833|   2.97|
|[5000205034546, 8...|[5000205039862]|       1.0| 3340.0|
|[5000205034546, 8...|[5010228013213]|       1.0| 1670.0|
|[5000205034546, 8...|[5000128889889]|       1.0|1113.33|
|[9770307017124, 2...|[9770307757129]|       0.5|  28.79|
|[0000050128747, 5...|[5000128279796]|    0.4286| 429.43|
|[0000050128747, 5...|[5000128918305]|    0.4286|  46.18|
|[5000128879606, 5...|[5000128915618]|    0.4286|  81.02|
|[5000128104517, 5...|[9770307757129]|    0.1304|   7.51|
|[5000128104517, 5...|[5000128861069]|    0.1304|  13.34|
|[9771056348958, 9...|[9770307268922]|       1.0|  43.38|
|[208380100000

In [14]:
single_rule_df.count()
#single_rule_df.withColumn("antecedent", single_rule_df.antecedent.cast("StringType"))
#single_rule_df.write.csv("file.csv")

3386

Export dataframes

In [15]:
def toCSVLine(data):

  return ','.join(str(d) for d in data)

single_lines = single_rule_df.rdd.map(toCSVLine)
double_lines = double_rule_df.rdd.map(toCSVLine)
#lines = single_rule_df.rdd.map(lambda x: ','.join(str(d) for d in x))

single_lines.coalesce(1).saveAsTextFile('single_rule_file')
double_lines.coalesce(1).saveAsTextFile('double_rule_file')