# **Implementing SON and Toivonen's Algorithm Using MapReduce**


The SON algorithm and the Toivonen algorithm are both widely used approaches for solving the problem of frequent itemset mining in large datasets. These algorithms are particularly well-suited to distributed computing environments, where data is too large to fit into the memory of a single machine.

Implementing these algorithms in MapReduce or Apache Spark leverages the power of distributed computing to handle very large datasets efficiently. In a MapReduce context, the mapper function can distribute data partitions across nodes for local frequent itemset mining (Phase 1 of SON or the sampling step of Toivonen), while the reducer function aggregates the results across all nodes for the global verification step. Apache Spark, with its in-memory computing capabilities, offers a more efficient and faster platform for these algorithms, especially due to its optimization for iterative algorithms like Apriori, which is used within both SON and Toivonen algorithms, while Spark's resilient distributed datasets (RDDs) and dataframes provide flexible abstractions for distributing data and computations.

## **SON Algorithm**
The SON algorithm, proposed by Savasere, Omiecinski, and Navathe, breaks down the task of identifying frequent itemsets into two phases to make it manageable across distributed systems. In the first phase, the algorithm partitions the dataset and applies the Apriori algorithm to each partition to find local frequent itemsets. This phase significantly reduces the dataset's size that each node must handle, allowing the algorithm to scale efficiently with data size. In the second phase, the algorithm aggregates the local frequent itemsets from all partitions and then scans the entire dataset to determine which of these itemsets are indeed frequent across the whole dataset. The SON algorithm's is simple and effective, enabling parallel processing without missing any frequent itemsets.

## **Working of SON Algorithm**

## First Pass
*   Repeatedly read small subsets of the baskets into main memory
*   Run an in-memory algorithm (e.g., Apriori, random sampling) to find all frequent itemsets\
(Note: we are not sampling, but processing the entire file in memory-sized chunks)
*   An itemset becomes a candidate if it is found to be frequent in any one or more subsets of the baskets

## Second Pass
*   Count all the candidate itemsets and determine which are frequent in the entire set
*   Key **“monotonicity”** idea: an itemset cannot be frequent in the entire set of baskets unless it is frequent in at least one subset
*   Subset or chunk contains fraction *p* of whole file
*   *1/p* chunks in file
*   If itemset is not frequent in any chunk, then support in each chunk is less than *ps*
*   Support in whole file is less than *s*: not frequent

## **SON: MapReduce**
## Phase 1: Find Candidate Itemsets
**Map:**
*   Input is a chunk/subset of all baskets - fraction *p* of total input file
*   Find itemsets frequent in that subset (e.g., using Apriori algorithm)
*   Use support threshold *ps*
*   Output is set of key-value pairs (*F*, 1), where *F* is a frequent itemset from sample

**Reduce:**
*   Each reduce task is assigned set of keys, which are itemsets
*   Produces keys that appear one or more time
*   Frequent in some subset
*   These are candidate itemsets

## Phase 2: Find True Frequent Itemsets
**Map:**
*   Each Map task takes output from first Reduce task AND a chunk of the total input data file
*   All candidate itemsets go to every Map task
*   Count occurrences of each candidate itemset among the baskets in the input chunk
*   Output is set of key-value pairs (*C*, *v*), where *C* is a candidate frequent itemset and *v* is the support for that itemset among the baskets in the input chunk

**Reduce:**
*   Each reduce tasks is assigned a set of keys (itemsets)
*   Sums associated values for each key: total support for itemset
*   If support of itemset >= *s*, print itemset and its count

However, even with SON algorithm, we still don’t know whether we found all the frequent itemsets, as an itemset may be infrequent in all subsets but frequent overall - Toivonen's algorithm solves this.

In [129]:
# Install pyspark
# Apache Spark is an open-source, distributed processing system used for big data workloads
!pip install pyspark



In [130]:
# Import necessary libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from collections import Counter
from itertools import combinations
import time

In [131]:
# Initialize a Spark session
# The appName method names the application as 'SON_DM'
spark = SparkSession.builder.appName('SON_DM').getOrCreate()

In [132]:
# Define the input file location
inputFile = "basket.txt"

# Create an RDD (Resilient Distributed Dataset) by reading the text file (RDDs are the fundamental data structure of Spark)
rdd = spark.sparkContext.textFile(inputFile)

In [133]:
# Print the first 10 lines of the RDD to check data loading
print(rdd.take(10))

['1,2,4,10,14,15,16', '1,2,4,5,14,16,17', '1,3,4,5,6,7,8,14,17,19', '1,2,3,4,5,12,13,14,15,16', '1,3,4,6,7,13,14,15,16', '1,2,4,5,8,9,13,14,15,16,17,18', '1,2,3,6,7,9,14,16,17', '1,2,3,5,8,14,15,19', '1,2,3,4,6,8,13', '1,3,4,5,6,12,14,15,16,18']


In [134]:
# Function to count the occurrence of each candidate itemset in the dataset
def total_c_Count(baskets, candidates):
    item_dict = {}

    # Convert the iterator to a list for processing
    baskets = list(baskets)

    # Iterate over each candidate itemset
    for candidate in candidates:
        # Ensure the candidate is in a consistent format (as a tuple)
        if type(candidate) is int:
            candidate = [candidate]
            key = tuple(sorted(candidate))
        else:
            key = candidate
        candidate = set(candidate)

        # Check if the candidate is a subset of each basket and count occurrences
        for basket in baskets:
            if candidate.issubset(basket):
                if key in  item_dict:
                     item_dict[key] =  item_dict[key] + 1
                else:
                     item_dict[key] = 1
    return item_dict.items()

In [135]:
# Apriori algorithm implementation for the first phase of SON
# It finds locally frequent itemsets in each partition
def apriori(baskets, support, chunkCount):
    localRes = list()

    # Convert the iterator to a list
    baskets = list(baskets)

    # Local support threshold
    threshold = support*(float(len(baskets))/float(chunkCount))

    # Count occurrences of single items
    singleton = Counter()
    for basket in baskets:
        singleton.update(basket)

    # Filter singletons by local support threshold
    c_singletons = {x : singleton[x] for x in singleton if singleton[x] >= threshold }
    get_fre_singletons = sorted(c_singletons)

    # Initialize results with singletons
    localRes.extend(get_fre_singletons)
    k=2
    items_fre = set(get_fre_singletons)

    # Generate candidate itemsets of increasing size until no more frequent itemsets are found
    while len(items_fre) != 0:
        if k==2:
            pairs = list()

            # Generate pairs of items for k=2
            for val in combinations(items_fre, 2):
                val = list(val)
                val.sort()
                pairs.append(val)
            candidate_k = pairs

        else:
            # Generate candidate itemsets of size k > 2
            perm = list()
            items_fre = list(items_fre)
            for i in range(len(items_fre)-1):
                for j in range(i+1, len(items_fre)):
                    a = items_fre[i]
                    b = items_fre[j]
                    if a[0:(k-2)] == b[0:(k-2)]:
                        perm.append(list(set(a) | set(b)))
                    else:
                        break
            candidate_k = perm

        # Count occurrences of k-itemsets and filter by local support
        k_item_Dict = {}
        for candidate in candidate_k:
            candidate = set(candidate)
            key = tuple(sorted(candidate))
            for basket in baskets:
                if candidate.issubset(basket):
                    if key in k_item_Dict:
                        k_item_Dict[key] = k_item_Dict[key] + 1
                    else:
                        k_item_Dict[key] = 1
        kItem = Counter(k_item_Dict)
        k_fre_items = {x : kItem[x] for x in kItem if kItem[x] >= threshold }
        k_fre_items = sorted(k_fre_items)
        new_item_fre = k_fre_items

        # Update results and prepare for next iteration
        localRes.extend(new_item_fre)
        items_fre = list(set(new_item_fre))
        items_fre.sort()
        k=k+1

    return localRes

In [136]:
# Main function implementing the SON algorithm
def SON_ALGO():
    start = time.time()

    # Define the global support threshold
    support  = .5
    inputFile = "basket.txt"
    outputFile = "son_output.txt"

    # Initialize Spark context with configuration
    conf = SparkConf().setMaster("local[4]").setAppName("SON_DM")

    # Get or create a SparkContext with the specified configuration
    sc = SparkContext.getOrCreate(conf)

    # Load data and preprocess into baskets
    rdd = sc.textFile(inputFile).map(lambda line: line.strip().split(','))
    print(rdd.take(10))
    print()

    # Calculate support threshold based on actual data size
    chunks = rdd
    chunksCount = chunks.count()
    support *= chunksCount
    print(chunksCount)
    print()

    # Phase 1: Find candidate itemsets in each partition
    map1 = chunks.mapPartitions(lambda chunk : apriori(chunk, support, chunksCount)).map(lambda x : (x, 1))
    reduce1 = map1.reduceByKey(lambda x,y: (1)).keys().collect()

    # Output candidates to file and stdout
    print("Candidates:")
    resFile = open(outputFile, "w")
    resFile.write("Candidates:")
    resFile.write("\n\n")
    for i in reduce1:
        resFile.write(str(i))
        print(str(i).strip())
        resFile.write(',')
    resFile.write("\n\n")
    print()

    # Phase 2: Count and filter global frequent itemsets based on candidates from phase 1
    map2 = chunks.mapPartitions(lambda chunk : total_c_Count(chunk, reduce1))
    reduce2 = map2.reduceByKey(lambda x,y: (x+y))
    finalRes = reduce2.filter(lambda x: x[1] >= support)
    freItems = finalRes.keys().collect()

    # Output frequent itemsets to file and stdout
    print("Frequent Itemsets:")
    resFile.write("\n\n")
    resFile.write("Frequent Itemsets:")
    resFile.write("\n")
    if len(freItems) != 0:
        size = len(freItems[0])
        j = 0
        for i in range(0, len(freItems)):
            c_size = len(freItems[i])
            if size == c_size:
                if j!=0:
                    resFile.write(", ")
            else:
                resFile.write("\n\n")
            if c_size == 1:
                z = []
                for val in freItems[i]:
                    val = "'" + str(val) + "'"
                    val = val.strip('\'"')
                    z.append(val)
                z = tuple(z)
                str_val = str(z).replace(',', '')
            else:
                x = []
                for val in freItems[i]:
                    val = "'" + str(val) + "'"
                    val = val.strip('\'"')
                    x.append(val)
                x = tuple(x)
                str_val = str(x)
            resFile.write(str_val); size = c_size; j = j+1
            print(str_val)
    end = time.time()
    print()
    print("Time taken: ")
    print(end - start)

In [137]:
# Execute the SON algorithm
SON_ALGO()

[['1', '2', '4', '10', '14', '15', '16'], ['1', '2', '4', '5', '14', '16', '17'], ['1', '3', '4', '5', '6', '7', '8', '14', '17', '19'], ['1', '2', '3', '4', '5', '12', '13', '14', '15', '16'], ['1', '3', '4', '6', '7', '13', '14', '15', '16'], ['1', '2', '4', '5', '8', '9', '13', '14', '15', '16', '17', '18'], ['1', '2', '3', '6', '7', '9', '14', '16', '17'], ['1', '2', '3', '5', '8', '14', '15', '19'], ['1', '2', '3', '4', '6', '8', '13'], ['1', '3', '4', '5', '6', '12', '14', '15', '16', '18']]

114520

Candidates:
1
14
4
('1', '14')
('1', '4')
('14', '4')
('15', '2')
('15', '3')
('2', '3')
('2', '5')
('1', '14', '15')
('1', '14', '2')
('1', '14', '3')
('1', '14', '5')
('1', '2', '4')
('1', '3', '4')
('14', '2', '4')
('1', '14', '2', '3')
('1', '2', '3', '4')
15
2
3
5
('1', '15')
('1', '2')
('1', '3')
('1', '5')
('14', '15')
('14', '2')
('14', '3')
('14', '5')
('2', '4')
('3', '4')
('1', '14', '4')
('1', '15', '2')
('1', '15', '3')
('1', '2', '3')
('1', '2', '5')
('14', '2', '3')
('

## **Toivonen's Algorithm**
The Toivonen algorithm introduces a probabilistic approach to frequent itemset mining. It starts by selecting a random sample of the dataset and then applies the Apriori algorithm to this sample to find potential frequent itemsets and generate a "negative border" – itemsets that are not frequent in the sample but are close to the threshold. The entire dataset is then scanned to verify which of the sampled frequent itemsets are genuinely frequent and to ensure that no itemsets in the negative border are frequent. This method reduces the computational cost by potentially requiring only one full scan of the dataset, at the cost of having to handle the complexity of dealing with the negative border.

## **Working of Toivonen's Algorithm**

## First Pass
Find candidate frequent itemsets from sample
*   *Use lower threshold* : For fraction *p* of baskets in sample, use *0.8ps* or *0.9ps* as support threshold - identifies itemsets that are frequent for the sample
*   Construct the *negative border* - itemsets that are not frequent in the sample but all of their immediate subsets are frequent

## Second Pass
Process the whole file (no sampling)
*   Count all candidate frequent itemsets from the first pass and all itemsets on the negative border
*   *Case 1* : No itemset from the negative border turns out to be frequent in the whole data set - correct set of frequent itemsets is exactly the itemsets from the sample that were found frequent in the whole data
*   *Case 2* : Some member of negative border is frequent in the whole data set - can give no answer at this time and must repeat the algorithm with a new random sample

## **Why Toivonen's Algorithm Works**
Toivonen’s algorithm never constructs a false positive, since it only describes as frequent those itemsets that have been counted and found to be frequent in the total. It also never constructs a false negative, as when no itemset of the negative border is frequent in the whole, there can be no itemset that is both frequent in the complete itemset and present in neither the negative border nor the collection of frequent itemsets for the given sample.

In [109]:
# Import necessary libraries
import re
import random
import time

In [110]:
# This function executes the second pass of the Toivonen algorithm.
# It validates the candidate itemsets discovered in the first pass against the entire dataset.
def ExecuteSecondPassToivonen(lstRandomFrequentItemSets, lstNegativeBorderItemSets, lstInputData, support):
        # Initialize dictionaries to count occurrences of candidate and negative border itemsets
        dictFrequentItems= {}
        dictNegativeBorderItemCounts = {}

        # Initialize lists to hold final frequent itemsets and negative border itemsets exceeding support
        lstNegativeBorder= []
        lstFinalFrequentItems = []

        # Count occurrences of each candidate itemset in the entire dataset
        for setItem in lstRandomFrequentItemSets:
            countItem = 0
            for sinList in lstInputData:
                if(set(setItem).issubset(set(sinList))):
                    countItem += 1
            dictFrequentItems[tuple(setItem)] = countItem

        # Count occurrences of each negative border itemset in the entire dataset
        for setNegItem in lstNegativeBorderItemSets:
            countNegItem = 0
            for sinLi in lstInputData:
                if(set(setNegItem).issubset(set(sinLi))):
                    countNegItem +=1
            dictNegativeBorderItemCounts[tuple(setNegItem)]=countNegItem

        # Add itemsets to the final list if their count exceeds the support threshold
        for ele in dictFrequentItems:
            if(dictFrequentItems[ele] >= support):
                lstFinalFrequentItems.append(list(ele))

        # Check for negative border itemsets that exceed the support threshold
        for elem in dictNegativeBorderItemCounts:
            if(dictNegativeBorderItemCounts[elem] >= support):
                lstNegativeBorder.append(list(elem))

        # Determine if the algorithm needs to be executed again based on negative border results
        if(len(lstNegativeBorder) == 0):
            performAlgoAgain = 0
        else:
            performAlgoAgain = 1

        # Return a flag indicating if another execution is necessary, and the list of frequent itemsets
        return performAlgoAgain, lstFinalFrequentItems

In [111]:
# This function generates frequent random itemsets from singletons based on the support threshold
def GenerateFrequentRandomItemSets(lstFreqSingleTons, support, maxLengthTransaction, lstInputData, lstNegativeBorderItems):
    lstFreqItems = []
    lstFreqItems = lstFreqSingleTons
    lstFreqAllItems = lstFreqSingleTons
    dictCountItemSets = {}

    # Generate candidate itemsets and count their occurrences in the dataset
    while(len(lstFreqItems)>0):
        for i in range(maxLengthTransaction):
            for eleInd1 in range(len(lstFreqItems)):
                for eleInd2 in range(eleInd1+1, len(lstFreqItems)):
                    setA = set(lstFreqItems[eleInd1]+lstFreqItems[eleInd2])
                    sortedList = sorted(list(setA))
                    if(len(sortedList) == len(lstFreqItems[eleInd1])+1):
                        count =0
                        for sinTrans in lstInputData:
                            if(set(sortedList).issubset(set(sinTrans))):
                                count += 1
                        dictCountItemSets[tuple(sortedList)] = count

        # Determine if candidates meet the support threshold
        lstFreqItems = []
        for item in dictCountItemSets:
            if(dictCountItemSets[item]>= support):
                lstFreqItems.append(item)
                lstFreqAllItems.append(list(item))
            else:
                lstNegativeBorderItems.append(list(item))

        dictCountItemSets.clear()


    return lstFreqAllItems, lstNegativeBorderItems

In [112]:
# This function prepares input data for the Apriori algorithm by sampling and counting singletons
def AprioriSampleInputData(lstRandInputData, support, fractionOfTransactionUsed):
    dictSingleItemCount = {}
    lstLengthOfLst = []
    lstAllFrequentItem = []
    lstNegativeBorderItems = []
    lstNegativeSingleItems= []

    # Adjust the support threshold based on the fraction of transactions used
    randSupport = int(0.8*fractionOfTransactionUsed*support)
    lstSingleRandFrequentItems = []

    # Count occurrences of each item in the sampled data
    for alist in lstRandInputData:
        lstLengthOfLst.append(len(alist))
        for item in alist:
            dictSingleItemCount[item] = dictSingleItemCount.get(item, 0)+1

    # Filter items by the adjusted support threshold
    for element in dictSingleItemCount:
        if(dictSingleItemCount[element] >= randSupport):
            lstSingleRandFrequentItems.append(list(element))
        else:
            lstNegativeSingleItems.append(list(element))

    # Generate frequent itemsets and negative borders from the filtered singletons
    lstAllFrequentItem,lstNegativeBorderItems=GenerateFrequentRandomItemSets(sorted(lstSingleRandFrequentItems), randSupport, max(lstLengthOfLst), lstRandInputData,lstNegativeSingleItems)
    return lstAllFrequentItem, lstNegativeBorderItems

In [113]:
# This function implements the first pass of the Toivonen algorithm
# It processes the input data to find candidate frequent itemsets and their negative border
def ExecuteFirstPassToivonen(inputData, support,fractionOfTransactionUsed):
    # Initialize lists to store preprocessed input data and lengths of transactions
    lstInputData = []
    lstLengthOfEachTrasaction = []

    # Initialize lists to store frequent itemsets and negative border itemsets found in the sample
    lstFreqRandItems = []
    lstNegativeBorderItems = []

    # Preprocess each line of the input data to extract items and sort them
    # 're.findall('\w', line)' extracts all alphanumeric characters as separate items
    for line in inputData:
        # Extract items from each line
        line = re.findall('\w',line)

        # Sort the items alphabetically
        lines = sorted(line)

        # Record the length of each transaction
        lstLengthOfEachTrasaction.append(len(lines))

        # Add the sorted list of items to the input data list
        lstInputData.append(lines)

    # Calculate the number of elements to use in the sample based on the specified fraction
    numberOfElements = len(lstInputData)*fractionOfTransactionUsed

    # Randomly select a sample of transactions from the input data
    lstRandInputData = random.sample(lstInputData,int(numberOfElements))

    # Apply the Apriori algorithm to the sampled data to find frequent itemsets and their negative border
    # This step leverages the probabilistic nature of the Toivonen algorithm to efficiently find candidate itemsets
    lstFreqRandItems, lstNegativeBorderItems = AprioriSampleInputData(lstRandInputData, support, fractionOfTransactionUsed)

    # Return the frequent itemsets, negative border itemsets, and preprocessed input data for further processing
    return lstFreqRandItems, lstNegativeBorderItems, lstInputData

In [114]:
# This function is the entry point for executing the Toivonen algorithm.
# It orchestrates the execution of the algorithm's passes and outputs the results
def Toivonen():
    start=time.time()

    inputData="basket2.txt"
    support = 5
    fractionOfTransactionUsed = 0.4
    numberOfIterations =0
    executeToiven = 1
    lstRandomFrequentItemSets = []
    lstNegativeBorderItemSets = []
    lstFinalFrequentItems = []
    lstInputData= []
    dictSortedList = {}

    # Repeatedly execute the algorithm until no new negative borders are found
    while(executeToiven > 0):
        numberOfIterations +=1

        # Process the input data and execute the first pass of Toivonen
        inputData=open(inputData)
        lstRandomFrequentItemSets, lstNegativeBorderItemSets, lstInputData = ExecuteFirstPassToivonen(inputData, support,fractionOfTransactionUsed)

        # Execute the second pass to validate the frequent itemsets
        executeToiven, lstFinalFrequentItems=ExecuteSecondPassToivonen(lstRandomFrequentItemSets, lstNegativeBorderItemSets, lstInputData, support)
        inputData.close()

    # Output the number of iterations and the fraction of data used
    print(numberOfIterations)
    print(fractionOfTransactionUsed)

    # Sort and display the final frequent itemsets
    lstFinalFrequentItems = sorted(lstFinalFrequentItems)

    for item in lstFinalFrequentItems:
        dictSortedList.setdefault(len(item),[]).append(item)

    for lengthItem in dictSortedList:
        print(list(dictSortedList[lengthItem]))

    end = time.time()
    print()
    print("Time taken: ")
    print(end - start)

In [115]:
# Main function call to start the Toivonen algorithm
Toivonen()

1
0.4
[['1'], ['2'], ['3'], ['4']]
[['1', '2'], ['1', '3'], ['2', '3']]

Time taken: 
0.004271984100341797
