# Market Basket Analysis

Matheus Schmitz
<br><a href="https://www.linkedin.com/in/matheusschmitz/">LinkedIn</a></br>
<br><a href="https://matheus-schmitz.github.io/">Github Portfolio</a></br>

### Imports

In [1]:
import sys
from pyspark import SparkContext, SparkConf
from operator import add
import time
import math

In [2]:
# Track time taken
start_time = time.time()

### Spark

In [3]:
import findspark
findspark.init()
sc = SparkContext.getOrCreate(SparkConf().set("spark.executor.memory", "4g").set("spark.driver.memory", "4g"))
sc.setLogLevel('ERROR')

### Load Data

In [4]:
threshold_filter = 25
support = 60
input_file_path = "ta_feng_all_months_merged.csv"
output_file_path = "market_basket.csv"

In [5]:
# Read the CSV skipping its header
csvRDD = sc.textFile(input_file_path, round(math.sqrt(support)))
#csvRDD = sc.textFile(input_file_path, min(support//2, 8))
csvHeader = csvRDD.first()
csvRDD = csvRDD.filter(lambda row: row != csvHeader) \
                .map(lambda row: row.split(',')) \
                .map(lambda row: (row[0][1:-1]+'-'+row[1][1:-1], str(int(row[5][1:-1])))) 

In [6]:
# Convert the input data to baskets
bskts = csvRDD.groupByKey() \
                .map(lambda row: list(row[1])) \
                .filter(lambda row: len(row) > threshold_filter)

In [7]:
# Make the number of partitions and the support threshold available to all computing nodes
n_part= sc.broadcast(float(bskts.getNumPartitions()))
support = sc.broadcast(float(support))

### Generate Candidate Itemsets

In [8]:
def apriori(partition):
    # Start with singletons, aka itemset size of one, aka k=1
    k_size = 1

    # Get all partition-frequent singles so that apriori can run its iterations
    freq_items, baskets = frequentSingles(partition)

    # Output frequent sets of size 1 (aka frequent singles)
    yield k_size, freq_items

    # Reshape the singles to (-1, 1) so that they are standardized to the shape of future itemsets
    freq_items = [set([single]) for single in freq_items]

    # Loop over an increasing k value until no frequent itemsets are found in this partition
    while k_size >= 1:
        k_size += 1

        # Generate candidate itemsets for this partition
        candidate_itemsets = {}
        for a in freq_items:
            for b in freq_items:
                if len(set(a).union(set(b))) == k_size:
                    candidate_itemsets.update({tuple(sorted(set(a).union(set(b)))): 0})

        # Loop through candidate_itemsets if any were generated
        if bool(candidate_itemsets):
            # Check each candidate pair in each basket
            for itemset in candidate_itemsets.keys():
                for basket in baskets:
                    # Count number of baskets that have all itemset elements as frequent items
                    if all(True if item in basket else False for item in itemset):
                        candidate_itemsets[itemset] += 1

            # Keep only the items which pass the weighted threshold
            freq_items = [itemset for itemset, count in candidate_itemsets.items() if count >= support.value/n_part.value]

            # If there are k-sized itemsets yield them and move to the next k
            if bool(freq_items):
                yield k_size, freq_items
            # If no frequent itemset for this value of k was found in this partition, then stop
            else:
                break
        # If no candidate sets were generated in this iteration, then stop
        else:
            break

In [9]:
def frequentSingles(partition):
    singleton_counts, baskets  = {}, []

    # Count item occurances in each basket in this partition
    for list_of_values_grouped_by_key in partition:
        # Also append the basket to a list contining all baskets
        baskets.append(list_of_values_grouped_by_key)
        for i in list_of_values_grouped_by_key:
            if singleton_counts.get(i) == None:
                singleton_counts[i] = 0
            singleton_counts[i] += 1

    # Filter occurances to keep only frequent singletons (considering a weighted threshold based on the number of partitions)
    freq_items = [item for item, count in singleton_counts.items() if count >= support.value/n_part.value]
    freq_items.sort()

    return freq_items, baskets

In [10]:
# Apply A-Priori to each partition to find the candidate itemsets
candidatesRDD = bskts.mapPartitions(lambda partition: apriori(partition)) \
                .reduceByKey(lambda x, y: sorted(set(x + y))) \
                .sortBy(lambda candidate: candidate[0]) \
                .collect()

In [11]:
# Broadcast candidate itemsets to all nodes
candidate_itemsets = sc.broadcast(candidatesRDD)

In [12]:
# Write the candidate itemsets to the output file
with open(output_file_path, 'w') as fout:
    # Write the header for the candidates step
    fout.write('Candidates:')
    # Use a special output for the singletons, since they are not stored as tuples by python
    fout.write('\n' + ','.join([f"('{candidate}')" for candidate in candidate_itemsets.value[0][1]]))
    # For all other k-sized itemsets simply output them
    for candidates in candidate_itemsets.value[1:]:
        fout.write('\n\n' + ','.join([f"{candidate}" for candidate in candidates[1]]))

In [13]:
# Visualize the candidate itemsets found
candidatesRDD

[(1,
  ['20246013',
   '20332433',
   '20412074',
   '20415723',
   '20505233',
   '20535407',
   '20535414',
   '20546236',
   '20557003',
   '2250062000090',
   '2250078000251',
   '2250271000034',
   '2250271000218',
   '25700010326',
   '28400015547',
   '28400017305',
   '29000070295',
   '29000070301',
   '37000304593',
   '37000329169',
   '37000329206',
   '37000337270',
   '37000440147',
   '37000440192',
   '37000441809',
   '37000442127',
   '37000445111',
   '3960097006002',
   '40000015314',
   '40000015321',
   '40000757320',
   '4006670360150',
   '4014400901573',
   '4014612509215',
   '41186001559',
   '41419761748',
   '41736007284',
   '4710008111146',
   '4710008241140',
   '4710008241218',
   '4710008251125',
   '4710008290025',
   '4710008290056',
   '4710008290155',
   '4710010010017',
   '4710011401128',
   '4710011401135',
   '4710011401142',
   '4710011402019',
   '4710011402026',
   '4710011402194',
   '4710011405133',
   '4710011406123',
   '4710011409056',


### Find Frequent Itemsets

In [14]:
def formattedCandidateCounts(partition):
    # Get a dicitionary with the count of all singletons in this partition
    conts_dict = countCandidates(partition)

    # Single item candidates become strings instead of tuple, fix that
    for candidate, count in conts_dict.items():
        candidate_as_tuple = candidate if type(candidate) == tuple else tuple([candidate])
        yield candidate_as_tuple, count

In [15]:
def countCandidates(partition):
    # Count the number of candidate occurances in each partition (which is a set of baskets)
    counts = {}
    # Loop though each basket in the partition
    for list_iterator in partition:
        # Loop through each set of cancidates from apriori (which are indexed by itemset size)
        for candidate in candidate_itemsets.value:
            # For each candidate itemset of a given size, check if all its sub-elements are in a given basket, if yes, then add 1 to the count
            for itemset in candidate[1]:
                # Coerce singletorns to tutple type
                itemset_as_tuple = itemset if type(itemset) == tuple else tuple([itemset])
                # If all items in the itemset are frequent in a given basket, increase the itemset's counter by 1
                if all(True if item in list_iterator else False for item in itemset_as_tuple):
                    if counts.get(itemset) == None:
                        counts[itemset] = 0
                    counts[itemset] += 1
    return counts


In [16]:
# Count candidates to see which are truly frequent itemsets
freqItemsetsRDD = bskts.mapPartitions(lambda partition: formattedCandidateCounts(partition)) \
                .reduceByKey(add) \
                .filter(lambda candidate: candidate[1] >= support.value) \
                .keys().sortBy(lambda itemset: (len(itemset), itemset)) \
                .collect()

In [17]:
# Organize the itemsets by size, for outputting
output = dict()
for itemset in freqItemsetsRDD:
    itemset_size = len(itemset)
    if output.get(itemset_size) == None:
        output[itemset_size] = []
    output[itemset_size].append(itemset)

In [18]:
# Open the output file in append mode and add the frequent itemsets
with open(output_file_path, 'a') as fout:
    # Add a header for Frequent Itemsets
    fout.write('\n\n' + 'Frequent Itemsets:')
    # Use a special output for the singletons, since they are not stored as tuples by python
    fout.write('\n' + ','.join([f"('{itemset[0]}')" for itemset in output[1]]))
    # For all other k-sized itemsets simply output them
    for itemset_size in list(output.keys())[1:]:
        fout.write('\n\n' + ','.join([f"{itemset}" for itemset in output[itemset_size]]))

In [19]:
# Visualize the frequent itemsets found
output

{1: [('20412074',),
  ('20557003',),
  ('37000329169',),
  ('37000329206',),
  ('37000337270',),
  ('37000440147',),
  ('37000442127',),
  ('37000445111',),
  ('4014400901573',),
  ('4710011401128',),
  ('4710011401135',),
  ('4710011401142',),
  ('4710011402019',),
  ('4710011405133',),
  ('4710011406123',),
  ('4710011409056',),
  ('4710011432825',),
  ('4710012122121',),
  ('4710012131130',),
  ('4710015103370',),
  ('4710015202721',),
  ('4710018004605',),
  ('4710018004704',),
  ('4710018008634',),
  ('4710022201496',),
  ('4710022237501',),
  ('4710022275503',),
  ('4710032501791',),
  ('4710035369510',),
  ('4710036003581',),
  ('4710036007039',),
  ('4710036009071',),
  ('4710036012019',),
  ('4710043552102',),
  ('4710046021100',),
  ('4710054134403',),
  ('4710054380619',),
  ('4710063312168',),
  ('4710084225676',),
  ('4710085104116',),
  ('4710085104130',),
  ('4710085120093',),
  ('4710085120628',),
  ('4710085120680',),
  ('4710085120703',),
  ('4710085120710',),
  ('471

In [20]:
# Measure the total time taken and report it
time_elapsed = time.time() - start_time
print(f'Duration: {time_elapsed}')

Duration: 134.5662498474121


# End
Matheus Schmitz
<br><a href="https://www.linkedin.com/in/matheusschmitz/">LinkedIn</a></br>
<br><a href="https://matheus-schmitz.github.io/">Github Portfolio</a></br>