# Article code

In [1]:
from pyspark import SparkContext

In [None]:
spark = SparkContext("local", "Apriori")

In [3]:
# Here we are parsing each line as a value in RDD (Resilient Distributed Dataset).
file = spark.textFile("data/fruits.txt")

print(file.collect())

['Apple,Mango,Banana', 'Banana,Mango', 'Apple,Banana', 'Apple,Mango,Coconut', 'Strawberry,Grapes,Lemon,Raspberry', 'Raspberry,Grapes', 'Strawberry,Apple', 'Apple,Mango,Raspberry', 'Mango,Raspberry', 'Mango,Apple', 'Apple,Raspberry', 'Banana,Raspberry,Mango', 'Apple,Mango,Banana', 'Raspberry,Banana', 'Apple,Strawberry', 'Strawberry,Banana,Apple,Mango', 'Mango,Banana,Raspberry,Apple', 'Coconut,Apple,Raspberry', 'Raspberry,Coconut,Banana']


In [18]:
# Now we split each RDD value, turning them into arrays. 
lbitems = file.map(lambda line: line.split(','))

print(lbitems.collect()[:10])

[['Apple', 'Mango', 'Banana'], ['Banana', 'Mango'], ['Apple', 'Banana'], ['Apple', 'Mango', 'Coconut'], ['Strawberry', 'Grapes', 'Lemon', 'Raspberry'], ['Raspberry', 'Grapes'], ['Strawberry', 'Apple'], ['Apple', 'Mango', 'Raspberry'], ['Mango', 'Raspberry'], ['Mango', 'Apple']]


In [7]:
# Transform each element into an value of the RDD.
wlitems = file.flatMap(lambda line: line.split(','))
print(wlitems.collect())

['Apple', 'Mango', 'Banana', 'Banana', 'Mango', 'Apple', 'Banana', 'Apple', 'Mango', 'Coconut', 'Strawberry', 'Grapes', 'Lemon', 'Raspberry', 'Raspberry', 'Grapes', 'Strawberry', 'Apple', 'Apple', 'Mango', 'Raspberry', 'Mango', 'Raspberry', 'Mango', 'Apple', 'Apple', 'Raspberry', 'Banana', 'Raspberry', 'Mango', 'Apple', 'Mango', 'Banana', 'Raspberry', 'Banana', 'Apple', 'Strawberry', 'Strawberry', 'Banana', 'Apple', 'Mango', 'Mango', 'Banana', 'Raspberry', 'Apple', 'Coconut', 'Apple', 'Raspberry', 'Raspberry', 'Coconut', 'Banana']


In [8]:
uniqueItems = wlitems.distinct()

# Contains each value and 1 as tuple.
# Ex: ('Apple', 1)
supportRDD = wlitems.map(lambda item: (item, 1))

print(supportRDD.collect())

[('Apple', 1), ('Mango', 1), ('Banana', 1), ('Banana', 1), ('Mango', 1), ('Apple', 1), ('Banana', 1), ('Apple', 1), ('Mango', 1), ('Coconut', 1), ('Strawberry', 1), ('Grapes', 1), ('Lemon', 1), ('Raspberry', 1), ('Raspberry', 1), ('Grapes', 1), ('Strawberry', 1), ('Apple', 1), ('Apple', 1), ('Mango', 1), ('Raspberry', 1), ('Mango', 1), ('Raspberry', 1), ('Mango', 1), ('Apple', 1), ('Apple', 1), ('Raspberry', 1), ('Banana', 1), ('Raspberry', 1), ('Mango', 1), ('Apple', 1), ('Mango', 1), ('Banana', 1), ('Raspberry', 1), ('Banana', 1), ('Apple', 1), ('Strawberry', 1), ('Strawberry', 1), ('Banana', 1), ('Apple', 1), ('Mango', 1), ('Mango', 1), ('Banana', 1), ('Raspberry', 1), ('Apple', 1), ('Coconut', 1), ('Apple', 1), ('Raspberry', 1), ('Raspberry', 1), ('Coconut', 1), ('Banana', 1)]


In [9]:
def sumOperator(x, y):
    return x+y

In [10]:
# Sum of values by key
supportRDD = supportRDD.reduceByKey(sumOperator)

print(supportRDD.collect())

[('Apple', 12), ('Mango', 10), ('Banana', 9), ('Coconut', 3), ('Strawberry', 4), ('Grapes', 2), ('Lemon', 1), ('Raspberry', 10)]


In [11]:
# Get an RDD with only the counts of each element
supports = supportRDD.map(lambda item: item[1])

print(supports.collect())

[12, 10, 9, 3, 4, 2, 1, 10]


In [12]:
minSupportData = supports.min()
minSupportData

1

In [13]:
minSupport = 2
supportRDD = supportRDD.filter(lambda item: item[1] >= minSupport)

# Transforms each element of the RDD into a list instead of tuple
baseRDD = supportRDD.map(lambda item: ([item[0], item[1]]))

print(baseRDD.collect())

[['Apple', 12], ['Mango', 10], ['Banana', 9], ['Coconut', 3], ['Strawberry', 4], ['Grapes', 2], ['Raspberry', 10]]


In [14]:
# Make an RDD with only the unique item names
supportRDDCart = supportRDD.map(lambda item: item[0])

print(supportRDDCart.collect())

['Apple', 'Mango', 'Banana', 'Coconut', 'Strawberry', 'Grapes', 'Raspberry']


In [15]:
def removeReplica(record):
    if(isinstance(record[0], tuple)):
        x1 = record[0]
        x2 = record[1]
    else:
        x1 = [record[0]]
        x2 = record[1]
    
    if(not any(x ==x2 for x in x1)):
        a = list(x1)
        a.append(x2)
        a.sort()
        result = tuple(a)
    
        return result

    return x1

In [19]:
c = 2 # Combination length

while(supportRDDCart.isEmpty() == False):
    # Makes all the combinations of items.
    # Ex: [('Apple', 'Apple'), ('Apple', 'Mango')..]
    combined = supportRDDCart.cartesian(uniqueItems)
    
    # Turn elements with repeated items such as ('Apple', 'Apple') into ['Apple']
    combined = combined.map(lambda item: removeReplica(item))

    # Make combinations of 3 items
    combined = combined.filter(lambda item: len(item) == c)
    combined = combined.distinct()
    combined2 = combined.cartesian(lbitems)

    combined2 = combined2.filter(lambda item: all(x in item[1] for x in item[0]))
    combined2 = combined2.map(lambda item: item[0])
    combined2 = combined2.map(lambda item: (item, 1))

    # Count combinations
    combined2 = combined2.reduceByKey(sumOperator)

    combined2 = combined2.filter(lambda item: item[1] >= minSupport)
    baseRDD = baseRDD.union(combined2)
    combined2 = combined2.map(lambda item: item[0])
    supportRDDCart = combined2
    
    print(c, '. Table was created...') 
    
    c = c + 1

print(baseRDD.collect()[:10])

[['Apple', 12], ['Mango', 10], ['Banana', 9], ['Coconut', 3], ['Strawberry', 4], ['Grapes', 2], ['Raspberry', 10], (('Apple', 'Mango'), 7), (('Apple', 'Banana'), 5), (('Apple', 'Coconut'), 2)]


In [15]:
class Filter():
    def __init__(self) -> None:
        self.stages = 1

    def filterForConf(self, item, total):
        if(len(item[0][0]) > len(item[1][0])):
            if(self.checkItemSets(item[0][0], item[1][0]) == False):
                pass
            else:
                return item
        else:
            pass
        self.stages = self.stages + 1

    # Check Items sets includes at least one common item
    # Example command: any(l == k for k in z for l in x)
    def checkItemSets(slef, item_1, item_2):
        if (len(item_1)) > len(item_2):
            return all(any(k==l for k in item_1) for l in item_2)
        else:
            return all(any(k==l for k in item_2) for l in item_1)
        
    def calculateConfidence(self, item):
        # Parent item list
        parent = set(item[0][0])

        # Child item list
        if(isinstance(item[1][0], str)):
            child = set([item[1][0]])
        else:
            child = set(item[1][0])
        
        # Parent and Child support values
        parentSupport = item[0][1]
        childSupport = item[1][1]

        # Finds the item set confidence is going to be found
        support = (parentSupport / childSupport) * 100

        return list([list(child), list(parent.difference(child)), support])
    
# Example ((('x10', 'x3', 'x6', 'x7', 'x9'), 1), (('x10', 'x3', 'x7'), 1))
calcuItems = baseRDD.cartesian(baseRDD)

print(calcuItems.collect())

                                                                                

[(['Apple', 12], ['Apple', 12]), (['Apple', 12], ['Mango', 10]), (['Apple', 12], ['Banana', 9]), (['Apple', 12], ['Coconut', 3]), (['Apple', 12], ['Strawberry', 4]), (['Apple', 12], ['Grapes', 2]), (['Apple', 12], ['Raspberry', 10]), (['Mango', 10], ['Apple', 12]), (['Banana', 9], ['Apple', 12]), (['Mango', 10], ['Mango', 10]), (['Mango', 10], ['Banana', 9]), (['Banana', 9], ['Mango', 10]), (['Banana', 9], ['Banana', 9]), (['Mango', 10], ['Coconut', 3]), (['Mango', 10], ['Strawberry', 4]), (['Mango', 10], ['Grapes', 2]), (['Mango', 10], ['Raspberry', 10]), (['Banana', 9], ['Coconut', 3]), (['Banana', 9], ['Strawberry', 4]), (['Banana', 9], ['Grapes', 2]), (['Banana', 9], ['Raspberry', 10]), (['Coconut', 3], ['Apple', 12]), (['Strawberry', 4], ['Apple', 12]), (['Grapes', 2], ['Apple', 12]), (['Raspberry', 10], ['Apple', 12]), (['Coconut', 3], ['Mango', 10]), (['Coconut', 3], ['Banana', 9]), (['Strawberry', 4], ['Mango', 10]), (['Strawberry', 4], ['Banana', 9]), (['Grapes', 2], ['Mango',

In [16]:
filter = Filter()
total = calcuItems.count()
baseRDDConfidence = calcuItems.filter(lambda item: filter.filterForConf(item, total))

print(baseRDDConfidence.collect())



[((('Apple', 'Banana', 'Mango'), 4), (('Apple', 'Mango'), 7)), ((('Apple', 'Banana', 'Mango'), 4), (('Apple', 'Banana'), 5)), ((('Apple', 'Banana', 'Mango'), 4), (('Banana', 'Mango'), 6)), ((('Apple', 'Mango', 'Raspberry'), 2), (('Apple', 'Mango'), 7)), ((('Apple', 'Mango', 'Raspberry'), 2), (('Apple', 'Raspberry'), 4)), ((('Banana', 'Mango', 'Raspberry'), 2), (('Banana', 'Mango'), 6)), ((('Apple', 'Mango', 'Raspberry'), 2), (('Mango', 'Raspberry'), 4)), ((('Banana', 'Mango', 'Raspberry'), 2), (('Mango', 'Raspberry'), 4)), ((('Banana', 'Mango', 'Raspberry'), 2), (('Banana', 'Raspberry'), 4))]


                                                                                

In [17]:
baseRDDConfidence = baseRDDConfidence.map(lambda item: filter.calculateConfidence(item))

# Customers that buy mango and apple might buy banana with 57.41% confidence
print(baseRDDConfidence.collect())



[[['Mango', 'Apple'], ['Banana'], 57.14285714285714], [['Apple', 'Banana'], ['Mango'], 80.0], [['Mango', 'Banana'], ['Apple'], 66.66666666666666], [['Mango', 'Apple'], ['Raspberry'], 28.57142857142857], [['Raspberry', 'Apple'], ['Mango'], 50.0], [['Mango', 'Banana'], ['Raspberry'], 33.33333333333333], [['Mango', 'Raspberry'], ['Apple'], 50.0], [['Mango', 'Raspberry'], ['Banana'], 50.0], [['Raspberry', 'Banana'], ['Mango'], 50.0]]


                                                                                

In [18]:
import pandas as pd

In [19]:
result = baseRDDConfidence.collect()
confidenceTable = pd.DataFrame(data=result, columns=["Before", "After", "Confidence"])

print(confidenceTable)



                Before        After  Confidence
0       [Mango, Apple]     [Banana]   57.142857
1      [Apple, Banana]      [Mango]   80.000000
2      [Mango, Banana]      [Apple]   66.666667
3       [Mango, Apple]  [Raspberry]   28.571429
4   [Raspberry, Apple]      [Mango]   50.000000
5      [Mango, Banana]  [Raspberry]   33.333333
6   [Mango, Raspberry]      [Apple]   50.000000
7   [Mango, Raspberry]     [Banana]   50.000000
8  [Raspberry, Banana]      [Mango]   50.000000


                                                                                

# Trying with the groceries dataset

In [20]:
from pyspark import SparkContext
import pandas as pd

In [21]:
df = pd.read_csv('data/Groceries_dataset.csv')

df.head()

Unnamed: 0,Member_number,Date,itemDescription
0,1808,21-07-2015,tropical fruit
1,2552,05-01-2015,whole milk
2,2300,19-09-2015,pip fruit
3,1187,12-12-2015,other vegetables
4,3037,01-02-2015,whole milk


In [22]:
df['single_transaction'] = df['Member_number'].astype(str) + '_' + df['Date']

df.head()

Unnamed: 0,Member_number,Date,itemDescription,single_transaction
0,1808,21-07-2015,tropical fruit,1808_21-07-2015
1,2552,05-01-2015,whole milk,2552_05-01-2015
2,2300,19-09-2015,pip fruit,2300_19-09-2015
3,1187,12-12-2015,other vegetables,1187_12-12-2015
4,3037,01-02-2015,whole milk,3037_01-02-2015


In [23]:
# Creates a pivot table.
# First argument of the crosstab function receives the variable that will be on the rows.
# Second argument receives variable to be on the columns.
df2 = pd.crosstab(df['single_transaction'], df['itemDescription'])

df2.head()

itemDescription,Instant food products,UHT-milk,abrasive cleaner,artif. sweetener,baby cosmetics,bags,baking powder,bathroom cleaner,beef,berries,...,turkey,vinegar,waffles,whipped/sour cream,whisky,white bread,white wine,whole milk,yogurt,zwieback
single_transaction,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1000_15-03-2015,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,1,0
1000_24-06-2014,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0
1000_24-07-2015,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1000_25-11-2015,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1000_27-05-2015,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [24]:
# df2 can have values greater than 1 if someone buys more than 1 of the same item in the same purchase.
# Market basket analysis does not take quantity of items in consideration, so we transform it to 1.
def encode(item_freq):
    res = 0

    if item_freq > 0:
        res = 1
    
    return res

In [25]:
basket_input = df2.applymap(encode)

basket_input.head()

  basket_input = df2.applymap(encode)


itemDescription,Instant food products,UHT-milk,abrasive cleaner,artif. sweetener,baby cosmetics,bags,baking powder,bathroom cleaner,beef,berries,...,turkey,vinegar,waffles,whipped/sour cream,whisky,white bread,white wine,whole milk,yogurt,zwieback
single_transaction,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1000_15-03-2015,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,1,0
1000_24-06-2014,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0
1000_24-07-2015,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1000_25-11-2015,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1000_27-05-2015,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [26]:
def extract_items(purchase):
    return [column for column in basket_input.columns if purchase[column]==1]

In [28]:
purchase_arrays = basket_input.apply(extract_items, axis=1).tolist()
print(purchase_arrays[:10])

[['sausage', 'semi-finished bread', 'whole milk', 'yogurt'], ['pastry', 'salty snack', 'whole milk'], ['canned beer', 'misc. beverages'], ['hygiene articles', 'sausage'], ['pickled vegetables', 'soda'], ['curd', 'frankfurter'], ['rolls/buns', 'sausage', 'whole milk'], ['soda', 'whole milk'], ['beef', 'white bread'], ['frankfurter', 'soda', 'whipped/sour cream']]


In [30]:
lbitems = spark.parallelize(purchase_arrays)

print(lbitems.collect()[:10])

[['sausage', 'semi-finished bread', 'whole milk', 'yogurt'], ['pastry', 'salty snack', 'whole milk'], ['canned beer', 'misc. beverages'], ['hygiene articles', 'sausage'], ['pickled vegetables', 'soda'], ['curd', 'frankfurter'], ['rolls/buns', 'sausage', 'whole milk'], ['soda', 'whole milk'], ['beef', 'white bread'], ['frankfurter', 'soda', 'whipped/sour cream']]


In [31]:
# Transform each element into an value of the RDD.
wlitems = lbitems.flatMap(lambda x: x)

print(wlitems.collect()[:10])

['sausage', 'semi-finished bread', 'whole milk', 'yogurt', 'pastry', 'salty snack', 'whole milk', 'canned beer', 'misc. beverages', 'hygiene articles']


In [32]:
uniqueItems = wlitems.distinct()

# Contains each value and 1 as tuple.
# Ex: ('Apple', 1)
supportRDD = wlitems.map(lambda item: (item, 1))

print(supportRDD.collect()[:10])

[('sausage', 1), ('semi-finished bread', 1), ('whole milk', 1), ('yogurt', 1), ('pastry', 1), ('salty snack', 1), ('whole milk', 1), ('canned beer', 1), ('misc. beverages', 1), ('hygiene articles', 1)]


In [33]:
def sumOperator(x, y):
    return x+y

In [34]:
# Sum of values by key
supportRDD = supportRDD.reduceByKey(sumOperator)

print(supportRDD.collect()[:10])

[('sausage', 903), ('semi-finished bread', 142), ('whole milk', 2363), ('yogurt', 1285), ('pastry', 774), ('salty snack', 281), ('canned beer', 702), ('misc. beverages', 236), ('hygiene articles', 205), ('pickled vegetables', 134)]


In [35]:
# Get an RDD with only the counts of each element
supports = supportRDD.map(lambda item: item[1])

print(supports.collect()[:10])

[903, 142, 2363, 1285, 774, 281, 702, 236, 205, 134]


In [36]:
minSupportData = supports.min()
minSupportData

1

In [37]:
minSupport = 2
supportRDD = supportRDD.filter(lambda item: item[1] >= minSupport)

# Transforms each element of the RDD into a list instead of tuple
baseRDD = supportRDD.map(lambda item: ([item[0], item[1]]))

print(baseRDD.collect()[:10])

[['sausage', 903], ['semi-finished bread', 142], ['whole milk', 2363], ['yogurt', 1285], ['pastry', 774], ['salty snack', 281], ['canned beer', 702], ['misc. beverages', 236], ['hygiene articles', 205], ['pickled vegetables', 134]]


In [38]:
# Make an RDD with only the unique item names
supportRDDCart = supportRDD.map(lambda item: item[0])

print(supportRDDCart.collect()[:10])

['sausage', 'semi-finished bread', 'whole milk', 'yogurt', 'pastry', 'salty snack', 'canned beer', 'misc. beverages', 'hygiene articles', 'pickled vegetables']


In [39]:
def removeReplica(record):
    if(isinstance(record[0], tuple)):
        x1 = record[0]
        x2 = record[1]
    else:
        x1 = [record[0]]
        x2 = record[1]
    
    if(not any(x ==x2 for x in x1)):
        a = list(x1)
        a.append(x2)
        a.sort()
        result = tuple(a)
    
        return result

    return x1

In [21]:
# Unfortunately, the dataset seems to large for this manual implementation,
# as this section was not able to run after 30 min.
# This shows that the implementation is not efficient enough.

c = 2 # Combination length

while(supportRDDCart.isEmpty() == False):
    # Makes all the combinations of items.
    # Ex: [('Apple', 'Apple'), ('Apple', 'Mango')..]
    combined = supportRDDCart.cartesian(uniqueItems)
    
    # Turn elements with repeated items such as ('Apple', 'Apple') into ['Apple']
    combined = combined.map(lambda item: removeReplica(item))

    # Make combinations of 3 items
    combined = combined.filter(lambda item: len(item) == c)
    combined = combined.distinct()
    combined2 = combined.cartesian(lbitems)

    combined2 = combined2.filter(lambda item: all(x in item[1] for x in item[0]))
    combined2 = combined2.map(lambda item: item[0])
    combined2 = combined2.map(lambda item: (item, 1))

    # Count combinations
    combined2 = combined2.reduceByKey(sumOperator)

    combined2 = combined2.filter(lambda item: item[1] >= minSupport)
    baseRDD = baseRDD.union(combined2)
    combined2 = combined2.map(lambda item: item[0])
    supportRDDCart = combined2
    
    print(c, '. Table was created...') 
    
    c = c + 1

print(baseRDD.collect())

2 . Table was created...


                                                                                

3 . Table was created...


ERROR:root:KeyboardInterrupt while sending command.                 (0 + 1) / 1]
Traceback (most recent call last):
  File "/home/gabs/Projects/2023/market-basket-analysis/venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/gabs/Projects/2023/market-basket-analysis/venv/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/gabs/.pyenv/versions/3.10.4/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

[Stage 27:>                                                         (0 + 1) / 1]