## Parallel FP-Growth using Map-Reduce Approach

### Installing and Importing Libraries

In [None]:
!pip install --quiet mrjob==0.7.4

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/439.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m163.8/439.6 kB[0m [31m4.8 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m430.1/439.6 kB[0m [31m7.2 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.6/439.6 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
!pip install --quiet mlxtend --upgrade

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.4 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/1.4 MB[0m [31m2.3 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.5/1.4 MB[0m [31m6.4 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━[0m [32m1.0/1.4 MB[0m [31m8.7 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.4/1.4 MB[0m [31m9.8 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import csv
import re
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from prettytable import PrettyTable

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Filtering out Transactions from the dataset

In [None]:
groceries = pd.read_csv("/content/drive/MyDrive/BDP_Project/Retail_Transactions_Dataset.csv")

In [None]:
import ast

groceries['Product'] = groceries['Product'].apply(ast.literal_eval)

all_transactions = groceries['Product'].tolist()  # a list of list of strings containing transactions

print(all_transactions[0:5])

[['Hair Gel'], ['Tuna', 'Bread', 'Tissues', 'Trash Bags'], ['Jam', 'Soap', 'Ketchup'], ['BBQ Sauce'], ['Hand Sanitizer', 'Bread', 'Extension Cords', 'Ice Cream', 'Hand Sanitizer']]


#### Saving the transactions dataset in a file `all_transactions.csv`

In [None]:
df = pd.DataFrame(all_transactions)
# df.head()

df.to_csv('/content/drive/MyDrive/BDP_Project/all_transactions.csv', index=False, header=False)

### Parallel Counting

The below given code calculates individual frequencies of all the items.

In [None]:
%%file parallel_count.py
from mrjob.job import MRJob
import csv

class ItemFrequency(MRJob):

    def mapper(self, _, line):
        # Convert string back to list
        items = list(csv.reader([line]))[0]

        # Yield each item
        for item in items:
            yield (item, 1)

    def reducer(self, key, values):
        # Sum up the counts for each item
        yield (key, sum(values))

if __name__ == '__main__':
    ItemFrequency.run()

Writing parallel_count.py


In [None]:
!python parallel_count.py "/content/drive/MyDrive/BDP_Project/all_transactions.csv"

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/parallel_count.root.20231123.052529.826287
Running step 1 of 1...
job output is in /tmp/parallel_count.root.20231123.052529.826287/output
Streaming final output from /tmp/parallel_count.root.20231123.052529.826287/output...
"Beef"	1132
"Bread"	1093
"Broom"	1111
"Butter"	1160
"Canned Soup"	1031
"Carrots"	1110
"Cereal Bars"	1060
"Cereal"	1094
"Cheese"	1090
"Chicken"	1063
"Chips"	1080
"Cleaning Rags"	1088
"Cleaning Spray"	1111
"Coffee"	1067
"Deodorant"	1152
"Diapers"	1130
"Dish Soap"	1138
"Dishware"	1095
"Dustpan"	1078
"Eggs"	1096
"Extension Cords"	1129
"Feminine Hygiene Products"	1031
"Garden Hose"	1164
"Hair Gel"	1124
"Hand Sanitizer"	1072
"Honey"	1111
"Ice Cream"	1089
"Insect Repellent"	1079
"Iron"	1042
"Ironing Board"	1148
"Jam"	1085
"Ketchup"	1096
"Laundry Detergent"	1106
"Lawn Mower"	1093
"Light Bulbs"	1104
"Mayonnaise"	1057
"Milk"	1102
"Mop"	1102
"Mustard"	1106
"

### Constructing FList and GList using Parallel Counting

The below given code calculates the FList using Parallel Counting, and using the FList, a GList is constructed.

In [None]:
%%file parallel_count.py

from mrjob.job import MRJob
from mrjob.step import MRStep
import re
import csv

class ParallelCount1(MRJob):

    def mapper(self, _, line):
        items = list(csv.reader([line]))[0]
        for item in items:
            yield (item, 1)

    def reducer(self, item, counts):
        yield None, (item, sum(counts))

    def mapper_init_1(self):
        self.g = []

    def mapper_1(self, _, item_freq):
        item, frequency = item_freq
        self.g.append((item, frequency))
        yield None, (item, frequency)

    def reducer_init_1(self):
        self.flist = []

    def reducer_1(self, _, item_freqs):
        for item, frequency in item_freqs:
            self.flist.append((item, frequency))
        yield None, self.flist

    def mapper_init_2(self):
        self.glist = []
        self.Q = 6

    def mapper_2(self, _, flist):
        c = 0
        a = []
        for item, frequency in flist:
            if c < self.Q and item.strip("', "):  # Exclude empty items
                a.append(item)
            else:
                if c % self.Q == 0:
                    self.glist.append([int(c / self.Q), a])
                    a = []
                    a.append(item)
                else:
                    a.append(item)
            c += 1
        if len(a) != 0:
            self.glist.append([int(c / self.Q) + 1, a])

        for group in self.glist:
            yield group[0], group[1]

    def steps(self):
        return [
            MRStep(mapper=self.mapper, reducer=self.reducer),
            MRStep(mapper_init=self.mapper_init_1, mapper=self.mapper_1, reducer_init=self.reducer_init_1, reducer=self.reducer_1),
            MRStep(mapper_init=self.mapper_init_2, mapper=self.mapper_2)
        ]

if __name__ == '__main__':
    inputFile = "/content/drive/MyDrive/BDP_Project/all_transactions.csv"
    outFile = "/content/drive/MyDrive/BDP_Project/GLIST.csv"
    mr_job = ParallelCount1(args=[inputFile])
    with mr_job.make_runner() as runner:
      runner.run()
      f = open(outFile, "w")
      for key, value in mr_job.parse_output(runner.cat_output()):
        s = f'{value}'
        s = s[1:-2]
        s = s.replace('[', '')
        s = s.replace("'", '')
        print(s, file = f)
      f.close()
      print("Done!")

Overwriting parallel_count.py


In [None]:
!python parallel_count.py

No configs specified for inline runner
Done!


### Top K Most Recommended Items for each Product

Here minimum support is taken as 10^-4 and the value of K is 100.

The below given code, generates the Top K Most Recommended Items for each Product and stores it in an output file `topk.txt` in the same directory.

In [None]:
%%file parallel_fp.py
import pandas as pd
import heapq as heap
from mrjob.job import MRJob
from mrjob.step import MRStep
from mlxtend.preprocessing import TransactionEncoder
from mlxtend.frequent_patterns import fpgrowth

tempFile = "/content/drive/MyDrive/BDP_Project/GLIST.csv"

class ParallelFP(MRJob):

    def mapper_init(self):
      f = open(tempFile, 'r')
      lines = f.readlines()
      dep = {}
      a=[]
      for line in lines:
        line=line[:-1]
        record = line.split(',')
        a=[]
        for i in range(1,len(record)):
          a.append(record[i])
          a[-1]=a[-1][1:]
        dep[record[0]]=a
      self.dep = dep

    def mapper(self, _, line):
      record = line.split(",")
      dep=self.dep
      for x in range(len(record)):
        a=[]
        for i in dep:
          if record[x] in dep[i]:
            a.append(record[x])
            for j in range(x+1,len(record)):
              if record[j] in dep[i]:
                a.append(record[j])
            yield i,a

    def reducer(self,item,vals):
      x=[]
      for i in vals:
        x.append(i)
      yield item,x

    def reducer_1(self,item,vals):
      for i in vals:
        trans_encoder = TransactionEncoder() # Instanciate the encoder
        trans_encoder_matrix = trans_encoder.fit(i).transform(i)
        trans_encoder_matrix = pd.DataFrame(trans_encoder_matrix, columns=trans_encoder.columns_)
        res=fpgrowth(trans_encoder_matrix,min_support=0.0001, use_colnames=True)
        support = []
        item_set = []
        item_dict = {}
        for i in res.index:
          support.append(float(res['support'][i]))
          item_set.append(list(res['itemsets'][i]))
          item_dict[support[-1]]=item_set[-1]
        yield item, item_dict

    def mapper_init_2(self):
      self.l=[]

    def mapper_2(self,item,vals):
      for i in vals:
        heap.heappush(self.l,(float(i),vals[i]))
        if(len(self.l)>100):
          heap.heappop(self.l)

    def mapper_final_2(self):
      for i in self.l:
        yield None, i

    def reducer_init_2(self):
        self.l=[]

    def reducer_2(self,item,vals):
      l1=self.l
      for i in vals:
        heap.heappush(l1,i)
        if(len(l1)>100):
          heap.heappop(l1)

    def reducer_final_2(self):
      self.l.sort(reverse=True)
      for i in self.l:
        yield  None,(i[0], i[1])

    def mapper_init_3(self):
      self.l=[]

    def mapper_3(self,item,vals):
      for i in range(len(vals[1])):
        yield vals[1][i], vals

    def reducer_3(self,item,vals):
      for i in vals:
        yield item,i

    def steps(self):
        return [
            MRStep(mapper_init=self.mapper_init,mapper=self.mapper,reducer=self.reducer),
            MRStep(reducer=self.reducer_1),
            MRStep(mapper_init=self.mapper_init_2,mapper = self.mapper_2,mapper_final = self.mapper_final_2,reducer_init=self.reducer_init_2,reducer = self.reducer_2,reducer_final=self.reducer_final_2),
            MRStep(mapper_init=self.mapper_init_3,mapper = self.mapper_3,reducer = self.reducer_3,)
        ]

if __name__ == '__main__':
    ParallelFP.run()

Writing parallel_fp.py


In [None]:
!python parallel_fp.py "/content/drive/MyDrive/BDP_Project/all_transactions.csv" > "/content/drive/MyDrive/BDP_Project/topk.txt"
# !python parallel_fp.py "/content/drive/MyDrive/BDP_Project/all_transactions.csv"

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/parallel_fp.root.20231123.053337.702791
Running step 1 of 4...
Running step 2 of 4...
Running step 3 of 4...
Running step 4 of 4...
job output is in /tmp/parallel_fp.root.20231123.053337.702791/output
Streaming final output from /tmp/parallel_fp.root.20231123.053337.702791/output...
Removing temp directory /tmp/parallel_fp.root.20231123.053337.702791...


## Parallel FP-Growth using Spark

### Installing and Setting up a Spark session

In [None]:
!pip install --quiet pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from pyspark.ml.fpm import FPGrowth

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FPGrowth").getOrCreate()

In [None]:
from pyspark.sql import Row

data = [Row(id=i, items=x) for i, x in enumerate(all_transactions)]

# Create a DataFrame
df = spark.createDataFrame(data)

# Show the DataFrame
df.show()

+---+--------------------+
| id|               items|
+---+--------------------+
|  0|          [Hair Gel]|
|  1|[Tuna, Bread, Tis...|
|  2|[Jam, Soap, Ketchup]|
|  3|         [BBQ Sauce]|
|  4|[Hand Sanitizer, ...|
|  5|[Shower Gel, Baby...|
|  6|      [Cereal, Tuna]|
|  7|[Iron, Extension ...|
|  8|   [Banana, Pickles]|
|  9|[Ketchup, Razors,...|
| 10|      [Shrimp, Soda]|
| 11|[Soap, Vacuum Cle...|
| 12|[BBQ Sauce, Soda,...|
| 13|[Ironing Board, L...|
| 14|   [Lawn Mower, Tea]|
| 15|             [Syrup]|
| 16|[Tea, Spinach, Mu...|
| 17|[Tuna, Bath Towel...|
| 18|[Syrup, Yogurt, E...|
| 19|              [Eggs]|
+---+--------------------+
only showing top 20 rows



### Removing duplicate transactions (if any) present in the dataframe

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# Define a UDF to remove duplicates from each transaction
remove_duplicates_udf = udf(lambda row: list(set(row)), ArrayType(StringType()))

# Remove duplicates from each transaction
df = df.withColumn("items", remove_duplicates_udf(df["items"]))

### Creating a Spark FP Growth Model

The following code gives the following 3 outputs:
1. Frequency Table, showing the Frequency of each item in the dataset.
2. Association Rules, showing the association rules generated by the FP-Growth model. Each rule has an “antecedent” (the items on the left-hand side of the rule), a “consequent” (the item on the right-hand side of the rule), and measures of rule interestingness such as “confidence” and “lift”.
3. Prediction Table, which shows the items that the model predicts might be included in the transaction, based on the patterns it found in the data.

In [None]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.0001, minConfidence=0.01)
model = fpGrowth.fit(df)

# Display frequent itemsets.
model.freqItemsets.show()

# Display generated association rules.
model.associationRules.show()

# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(df).show()

+--------------------+----+
|               items|freq|
+--------------------+----+
|             [Apple]|1052|
|[Apple, Light Bulbs]|  35|
|[Apple, Light Bul...|   3|
|    [Apple, Vinegar]|  32|
|[Apple, Vinegar, ...|   3|
|[Apple, Vinegar, ...|   3|
|[Apple, Vinegar, ...|   4|
|[Apple, Vinegar, ...|   3|
|   [Apple, Tomatoes]|  26|
|[Apple, Tomatoes,...|   3|
|      [Apple, Honey]|  31|
|[Apple, Honey, Ex...|   3|
|[Apple, Honey, Yo...|   3|
|[Apple, Honey, Ca...|   3|
| [Apple, Toothbrush]|  31|
|[Apple, Toothbrus...|   3|
|[Apple, Toothbrus...|   4|
|[Apple, Toothbrus...|   3|
|[Apple, Toothbrus...|   4|
|[Apple, Toothbrus...|   3|
+--------------------+----+
only showing top 20 rows

+--------------------+-------------------+-------------------+------------------+--------------------+
|          antecedent|         consequent|         confidence|              lift|             support|
+--------------------+-------------------+-------------------+------------------+---------------

In [None]:
spark.stop()