# **Loading Libraries**

In [1]:
from collections import defaultdict, OrderedDict
from csv import reader
from itertools import chain, combinations
from optparse import OptionParser
import numpy as np
from tabulate import tabulate
from google.colab import drive
import csv
from typing import List

# Define class Node
> Represents a node in the FP-tree



In [2]:
class Node:
    def __init__(self, itemName, frequency, parentNode):
        self.itemName = itemName
        self.count = frequency
        self.parent = parentNode
        self.children = {}
        self.next = None

    def increment(self, frequency):
        self.count += frequency

    def display(self, ind=1):
        print('  ' * ind, self.itemName, ' ', self.count)
        for child in list(self.children.values()):
            child.display(ind + 1)


# Read from CSV

> Adding a function to read file from CSV and try to define whether the file has header file



In [3]:
def read_csv_data(filename: str, has_header: bool = False) -> List[List[str]]:
    # Mount Google Drive if not already mounted
    try:
        drive.mount('/content/drive', force_remount=True)
    except Exception as e:
        print(f"Error mounting Google Drive: {str(e)}")
        return []

    # Attempt to read the CSV file
    try:
        with open(filename, 'r', newline='') as csvfile:
            csv_reader = csv.reader(csvfile)
            if has_header:
                next(csv_reader, None)  # Skip the header row
            dataset = []
            for row in csv_reader:
                # Strip whitespace and filter out empty items
                cleaned_row = [item.strip() for item in row if item.strip()]
                if cleaned_row:  # Only append non-empty rows
                    dataset.append(cleaned_row)
            return dataset
    except FileNotFoundError:
        print(f"Error: File '{filename}' not found in Google Drive")
        return []
    except Exception as e:
        print(f"Error reading CSV file: {str(e)}")
        return []


# ConstructTree Modified

The original headerTable was sorted by frequency in ascending order (reverse=False), but it has been modified to sort in descending order (reverse=True). This change ensures that high-frequency items are prioritized during the construction process.

In [4]:
def constructTree(itemSetList, frequency, minSup):
    headerTable = defaultdict(int)
    for idx, itemSet in enumerate(itemSetList):
        for item in itemSet:
            headerTable[item] += frequency[idx]

    headerTable = {item: sup for item, sup in headerTable.items() if sup >= minSup}
    if not headerTable:
        return None, None

    headerTable = {item: [freq, None] for item, freq in headerTable.items()}

    fpTree = Node('Null', 1, None)

    for idx, itemSet in enumerate(itemSetList):
        itemSet = [item for item in itemSet if item in headerTable]
        itemSet.sort(key=lambda item: (-headerTable[item][0], item))
        currentNode = fpTree
        for item in itemSet:
            currentNode = updateTree(item, currentNode, headerTable, frequency[idx])

    return fpTree, headerTable

# Functions to update Header Table and Tree

In [5]:
def updateHeaderTable(item, targetNode, headerTable):
    if(headerTable[item][1] == None):
        headerTable[item][1] = targetNode
    else:
        currentNode = headerTable[item][1]
        # Traverse to the last node then link it to the target
        while currentNode.next != None:
            currentNode = currentNode.next
        currentNode.next = targetNode

def updateTree(item, treeNode, headerTable, frequency):
    if item in treeNode.children:
        # If the item already exists, increment the count
        treeNode.children[item].increment(frequency)
    else:
        # Create a new branch
        newItemNode = Node(item, frequency, treeNode)
        treeNode.children[item] = newItemNode
        # Link the new branch to header table
        updateHeaderTable(item, newItemNode, headerTable)

    return treeNode.children[item]

def ascendFPtree(node, prefixPath):
    if node.parent != None:
        prefixPath.append(node.itemName)
        ascendFPtree(node.parent, prefixPath)


def getSupport(testSet, itemSetList):
    count = 0
    for itemSet in itemSetList:
        if set(testSet).issubset(itemSet):
            count += 1
    return count
def powerset(s):
    return chain.from_iterable(combinations(s, r) for r in range(1, len(s) + 1))
def getFrequencyFromList(itemSetList):
    return [1] * len(itemSetList)

# MineTree

> The mineTree function is responsible for recursively mining frequent patterns from the FP-Tree by constructing conditional FP-Trees for each item in the headerTable. The original recursive logic might not correctly process the conditional FP-Trees, potentially leading to incomplete pattern extraction or infinite recursion if not handled properly.



In [6]:
def mineTree(headerTable, minSup, preFix, freqItemList, itemSetList):
    # Sort the items with frequency and create a list
    sortedItemList = [item for item in headerTable.keys()]
    sortedItemList.sort(key=lambda item: (headerTable[item][0], item))

    # Start with the lowest frequency
    for item in sortedItemList:
        # Pattern growth is achieved by the concatenation of suffix pattern with frequent patterns generated from conditional FP-tree
        newFreqSet = preFix.copy()
        newFreqSet.add(item)
        support = getSupport(newFreqSet, itemSetList)
        # Define the range and the minSup's relationship
        if support >= minSup:
            freqItemList.append(newFreqSet)
        # Find all prefix path, constrcut conditional pattern base
        conditionalPattBase, frequency = findPrefixPath(item, headerTable)
        # Construct conditonal FP Tree with conditional pattern base
        conditionalTree, newHeaderTable = constructTree(conditionalPattBase, frequency, minSup)

        if newHeaderTable is not None:
            # Mining recursively on the tree
            mineTree(newHeaderTable, minSup, newFreqSet, freqItemList, itemSetList)

# FindPrefixPath
The original ascendFPtree function included the current node in the prefix path. So we need to modify it to only record the nodes above the current node (using prefixPath[1:])


In [7]:
def findPrefixPath(basePat, headerTable):
    # First node in linked list
    treeNode = headerTable[basePat][1]
    condPats = []
    frequency = []
    while treeNode != None:
        prefixPath = []
        # From leaf node all the way to root
        ascendFPtree(treeNode, prefixPath)
        if len(prefixPath) > 1:
            # Storing the prefix path and it's corresponding count
            condPats.append(prefixPath[1:])
            frequency.append(treeNode.count)

        # Go to next node
        treeNode = treeNode.next
    return condPats, frequency

# Association Rule

Accepts an additional optional parameter minLift (defaulting to None), which allows filtering rules based on the lift metric in addition to confidence.


In [8]:
def associationRule(freqItemSet, itemSetList, minConf, minLift=None):
    rules_conf_only = []  # Rules filtered by confidence only
    rules_conf_lift = []  # Rules filtered by confidence and lift
    total_transactions = len(itemSetList)

    for itemSet in freqItemSet:
        subsets = powerset(itemSet)
        itemSetSup = getSupport(itemSet, itemSetList)
        for s in subsets:
            antecedent = set(s)
            consequent = itemSet.difference(antecedent)
            if not consequent:
                continue
            sup_antecedent = getSupport(antecedent, itemSetList)
            if sup_antecedent > 0:
                confidence = itemSetSup / sup_antecedent
                # Add to confidence-only rules if it meets minConf
                if confidence >= minConf:
                    rules_conf_only.append([antecedent, consequent, confidence])

                # Calculate lift if minLift is provided
                if minLift is not None:
                    sup_consequent = getSupport(consequent, itemSetList)
                    if sup_consequent > 0:
                        expected_confidence = sup_consequent / total_transactions
                        lift = confidence / expected_confidence
                        if confidence >= minConf and lift >= minLift:
                            rules_conf_lift.append([antecedent, consequent, confidence, lift])

    return rules_conf_only, rules_conf_lift

# FP-growth_from_csv

This function is a wrapper that applies the FP-growth algorithm to a dataset loaded from a CSV file. It processes the data, constructs an FP-tree, mines frequent itemsets, and generates association rules based on specified thresholds for support, confidence, and lift.


In [9]:
def fpgrowth_from_csv(filename, minSupRatio, minConf, minLift, has_header=False):
    dataset = read_csv_data(filename, has_header)
    if not dataset:
        return [], [], []

    frequency = getFrequencyFromList(dataset)
    minSup = np.ceil(len(dataset) * minSupRatio)
    print(f"minSup: {minSup}")
    fpTree, headerTable = constructTree(dataset, frequency, minSup)
    if fpTree is None:
        return [], [], []

    freqItems = []
    mineTree(headerTable, minSup, set(), freqItems, dataset)
    rules_conf_only, rules_conf_lift = associationRule(freqItems, dataset, minConf, minLift)
    return freqItems, rules_conf_only, rules_conf_lift

# Main Function
> Execute all of the above mentioned functions and have an output result



In [10]:
if __name__ == "__main__":
    filename = "/content/drive/MyDrive/data/A1.csv"
    # Adjust this to your file's location in Google Drive
    minSupRatio = 0.0001
    minConf = 0.9
    minLift = 90

    # Run the FP-growth algorithm with the CSV file from Google Drive
    freqItemSet, rules_conf_only, rules_conf_lift = fpgrowth_from_csv(
        filename, minSupRatio, minConf, minLift, has_header=True
    )

    print("\nAssociation Rules (Confidence Only, minConf = 0.9):")
    headers = ["Antecedent", "Consequent", "Confidence"]
    table_data = [[list(rule[0]), list(rule[1]), f"{rule[2]:.2f}"] for rule in rules_conf_only]
    print(tabulate(table_data, headers=headers, tablefmt="grid"))

    # Print association rules filtered by confidence and lift
    print("\nAssociation Rules (Confidence + Lift, minConf = 0.9, minLift = 90):")
    headers = ["Antecedent", "Consequent", "Confidence", "Lift"]
    table_data = [[list(rule[0]), list(rule[1]), f"{rule[2]:.2f}", f"{rule[3]:.2f}"] for rule in rules_conf_lift]
    print(tabulate(table_data, headers=headers, tablefmt="grid"))

Mounted at /content/drive
minSup: 4.0

Association Rules (Confidence Only, minConf = 0.9):
+---------------------+---------------------+--------------+
| Antecedent          | Consequent          |   Confidence |
| ['flower soil']     | ['fertilizer']      |         1    |
+---------------------+---------------------+--------------+
| ['fertilizer']      | ['flower soil']     |         1    |
+---------------------+---------------------+--------------+
| ['nuts']            | ['prunes']          |         1    |
+---------------------+---------------------+--------------+
| ['prunes']          | ['nuts']            |         1    |
+---------------------+---------------------+--------------+
| ['cling film']      | ['bags']            |         1    |
+---------------------+---------------------+--------------+
| ['bags']            | ['cling film']      |         0.95 |
+---------------------+---------------------+--------------+
| ['photo']           | ['film']            |         1