<a href="https://colab.research.google.com/github/Aditya0996/BigData-MarketBasketAnalysis/blob/main/Big_Data_MarketBasketAnalysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u382-ga-1~22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 18 not upgraded.


In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd
import itertools

# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

Please upload input file to sample_data folder before running!

In [None]:
# Reading the input file
txt = spark.read.text("/content/sample_data/browsing.txt")

In [None]:
txt.head(5)

[Row(value='FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 '),
 Row(value='GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 '),
 Row(value='ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 '),
 Row(value='ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 '),
 Row(value='ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444 ')]

In [None]:
#Take each line and split by space and output as a list in Basket column
txt = txt.withColumn("basket", (split(txt.value, " ")))

In [None]:
txt.head(5)

[Row(value='FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 ', basket=['FRO11987', 'ELE17451', 'ELE89019', 'SNA90258', 'GRO99222', '']),
 Row(value='GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 ', basket=['GRO99222', 'GRO12298', 'FRO12685', 'ELE91550', 'SNA11465', 'ELE26917', 'ELE52966', 'FRO90334', 'SNA30755', 'ELE17451', 'FRO84225', 'SNA80192', '']),
 Row(value='ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 ', basket=['ELE17451', 'GRO73461', 'DAI22896', 'SNA99873', 'FRO86643', '']),
 Row(value='ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 ', basket=['ELE17451', 'ELE37798', 'FRO86643', 'GRO56989', 'ELE23393', 'SNA11465', '']),
 Row(value='ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444 ', basket=['ELE17451', 'SNA69641', 'FRO86643', 'FRO78087', 'SNA11465', 'GRO39357', 'ELE28573', 'ELE11375', 'DAI54444', ''])]

In [None]:
# Remove the empty space at the end of each list
basket_map = txt.rdd.map(lambda x: x.basket[:-1])

In [None]:
# Flattening the result to combine all lists
all_items = basket_map.flatMap(lambda x: x)
all_items.take(10)

['FRO11987',
 'ELE17451',
 'ELE89019',
 'SNA90258',
 'GRO99222',
 'GRO99222',
 'GRO12298',
 'FRO12685',
 'ELE91550',
 'SNA11465']

In [None]:
#Take each item as key and assign value as 1. Reduce by key to get total count of each item
L1_frequent_item = all_items.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)
#Sort the rdd by item names and filter to take the values having count > = 100 (support>=100) to get the L1 frequent set
L1_frequent_item = L1_frequent_item.sortBy(lambda x: x[0]).filter(lambda x: x[1]>=100)
L1_frequent_item.take(5)

[('DAI11223', 155),
 ('DAI11778', 117),
 ('DAI13194', 116),
 ('DAI13266', 181),
 ('DAI13788', 213)]

In [None]:
#Sanity check
L1_frequent_item.count()

647

In [None]:
#Convert the L1 frequent items into list
current_items1 = L1_frequent_item.map(lambda x: x[0]).collect()

In [None]:
#Take combinations of 2 for each basket in our input using only the items which are in the L1 frequent item list(current_items1)
def getTwoPair(basket):
  allowed = []
  output = []
  basket.sort()
  for x in basket:
    if x in current_items1:
      allowed.append(x)
  for y in itertools.combinations(allowed,2):
    output.append((y,1))
  return output

In [None]:
# To find the L2 frequent items we get the available combinations as key with value 1 using getTwoPair function
# Reduce by key to get total count of each combination and filter to take the values having count > = 100 (support>=100) to get the L2 frequent set
L2_frequent_item = basket_map.flatMap(lambda x: getTwoPair(x)).reduceByKey(lambda a,b: a+b).filter(lambda x: x[1]>=100).sortBy(lambda x: x[0])
L2_frequent_item.take(5)

[(('DAI16732', 'FRO78087'), 106),
 (('DAI18527', 'SNA44451'), 102),
 (('DAI22177', 'DAI31081'), 127),
 (('DAI22177', 'DAI62779'), 382),
 (('DAI22177', 'DAI63921'), 136)]

In [None]:
#Convert the L1 frequent items into list
current_items2 = L2_frequent_item.flatMap(lambda x: [x[0][0],x[0][1]]).collect()

In [None]:
#Take combinations of 3 for each basket in our input using only the items which are in the L2 frequent item list(current_items2)
def getThreePair(basket):
  allowed = []
  output = []
  basket.sort()
  for x in basket:
    if x in current_items2:
      allowed.append(x)
  for y in itertools.combinations(allowed,3):
    output.append((y,1))
  return output

In [None]:
# To find the L3 frequent items we get the available combinations as key with value 1 using getThreePair function
# Reduce by key to get total count of each combination and filter to take the values having count > = 100 (support>=100) to get the L3 frequent set
L3_frequent_item = basket_map.flatMap(lambda x: getThreePair(x)).reduceByKey(lambda a,b: a+b).filter(lambda x: x[1]>=100).sortBy(lambda x: x[0])
L3_frequent_item.take(5)

[(('DAI22896', 'DAI62779', 'GRO73461'), 101),
 (('DAI23334', 'DAI62779', 'ELE92920'), 143),
 (('DAI31081', 'DAI62779', 'ELE17451'), 103),
 (('DAI31081', 'DAI75645', 'FRO40251'), 122),
 (('DAI31081', 'ELE32164', 'GRO59710'), 112)]

In [None]:
# Confidence(A->B) = A U B / support(B)
def getConfidence2(x):
  x_count = L1_frequent_item.filter(lambda y: y[0] == x[0][0]).collect()[0][1] #Support(A)
  y_count = L1_frequent_item.filter(lambda y: y[0] == x[0][1]).collect()[0][1] #Support(B)
  return [(x[0],x[1]/x_count),((x[0][1],x[0][0]),x[1]/y_count)] # [(A U B / support(B)), (B U A / support(A))]

In [None]:
#Find confidence for each combinations in L2 frequent item
L2_frequent_item_list = L2_frequent_item.collect()
confidence2 = []
for x in L2_frequent_item_list:
  confidence2 = confidence2 + getConfidence2(x)

In [None]:
# Sort confidenceList by confidence values and secondry sort using user names in case of same confidence values
confidence2.sort(key= lambda x: (-x[1],x[0]))
confidence2[:5]

[(('DAI93865', 'FRO40251'), 1.0),
 (('GRO85051', 'FRO40251'), 0.999176276771005),
 (('GRO38636', 'FRO40251'), 0.9906542056074766),
 (('ELE12951', 'FRO40251'), 0.9905660377358491),
 (('DAI88079', 'FRO40251'), 0.9867256637168141)]

In [None]:
# Confidence(A,B->C) = A U B U C / support(A U B)
def getConfidence3(x):
  conf_xy_z = (((x[0][0],x[0][1]),x[0][2]), x[1]/L2_frequent_item.filter(lambda y: y[0] == (x[0][0],x[0][1])).collect()[0][1])
  conf_yz_x = (((x[0][1],x[0][2]),x[0][0]), x[1]/L2_frequent_item.filter(lambda y: y[0] == (x[0][1],x[0][2])).collect()[0][1])
  conf_xz_y = (((x[0][0],x[0][2]),x[0][1]), x[1]/L2_frequent_item.filter(lambda y: y[0] == (x[0][0],x[0][2])).collect()[0][1])
  return [conf_xy_z, conf_yz_x, conf_xz_y]

In [None]:
#Find confidence for each combinations in L3 frequent item
L3_frequent_item_list = L3_frequent_item.collect()
confidence3 = []
for x in L3_frequent_item_list:
  confidence3 = confidence3 + getConfidence3(x)

In [None]:
# Sort confidenceList by confidence values and secondry sort using user names in case of same confidence values
confidence3.sort(key= lambda x: (-x[1],x[0]))
confidence3[:5]

[((('DAI23334', 'ELE92920'), 'DAI62779'), 1.0),
 ((('DAI31081', 'GRO85051'), 'FRO40251'), 1.0),
 ((('DAI55911', 'GRO85051'), 'FRO40251'), 1.0),
 ((('DAI62779', 'DAI88079'), 'FRO40251'), 1.0),
 ((('DAI75645', 'GRO85051'), 'FRO40251'), 1.0)]

Sources-

 https://www.section.io/engineering-education/introduction-to-frequent-itemset-mining-with-python/

https://intellipaat.com/blog/data-science-apriori-algorithm/

https://github.com/devshah96/Product-Recommendation/blob/master/Association_rules.ipynb