In [1]:
# ref: https://github.com/shubhamjha97/association-rule-mining-apriori/blob/master/arm.py

In [56]:
import csv
from itertools import combinations

data_path = '/home/haianh/grad_project/ml-learning/data/groceries.csv'



In [47]:
def read(data_path: str):
    transactions = []
    items = []
    with open(data_path, 'r') as f:
        reader = csv.reader(f)
        for line in reader:
            transactions.append(line)
            items.extend(line)
        unique_items = sorted(set(items))
    return transactions, unique_items

# transactions, items = read(data_path)


In [4]:
transactions, items = read(data_path)

print(f'Transactions: {len(transactions)}, Unique items: {len(items)}')

Transactions: 9835, Unique items: 169


In [4]:
def generate_candidate(list_k_item_candidtates: list):
    """
    Generate k+1 itemset candidate from list of k-item set candidate
    
    Eg:
        [
            [1, 2, 3],
            [1, 2, 4],
            [1, 2, 5],
        ]
        -->
        [
            [1, 2, 3, 4], 
            [1, 2, 3, 5],
            [1, 2, 4, 5],        
        ]
    
    
    """
    n = len(list_k_item_candidtates)
    new_candidates = []

    for i in range(n):
        for j in range(i + 1, n):
            candidate_a = list_k_item_candidtates[i]
            candidate_b = list_k_item_candidtates[j]

            if candidate_a[:-1] == candidate_b[:-1]:
                temp_candidate = []
                temp_candidate.extend(candidate_a)
                temp_candidate.append(candidate_b[-1])
                temp_candidate = sorted(temp_candidate)
                new_candidates.append(temp_candidate)
    
    return new_candidates


In [49]:
def get_support(candidate_list, transactions):
    """
    Get support of candidate in transactions

    return:  dictionary with key = candidate, value = support 
        {
            (1, 2, 4, ) : 1,
            
            (1, 3, 5, ) : 2
        } 
    
    """
    candidate_support = {}

    for transaction in transactions:
        for candidate in candidate_list:
            # if all items in candidate exist in a transaction, increment support of it
            if set(candidate).issubset(set(transaction)):
                candidate_support[tuple(sorted(candidate))] = candidate_support.get(tuple(candidate), 0)
                candidate_support[tuple(sorted(candidate))] += 1

    return candidate_support
    


In [50]:
get_support([[1, 2], [2], [3]], [[1, 2, 4], [1, 2, 5], [2, 3, 5]])

{(1, 2): 2, (2,): 3, (3,): 1}

In [51]:
def generate_frequent_itemsets(data_path, min_sup = 0.1) -> list[dict]:
    transactions, items = read(data_path)
    
    one_itemset = [[item] for item in items]

    temp_candidate_support =  get_support(one_itemset, transactions)

    final_candidate_support = []

    min_support_count = min_sup * len(transactions)
    candidate_support = {}

    # prune candidates which does not meet minimum support 
    for candidate in temp_candidate_support.keys():
        if temp_candidate_support[candidate] >= min_support_count:
            candidate_support[tuple(sorted(candidate))] = temp_candidate_support[candidate]

    final_candidate_support.append(candidate_support)

    # Generate k+1 itemset from list of k-itemset
    while(len(candidate_support)):
        current_candidates = generate_candidate(list(candidate_support.keys()))
        
        if(len(current_candidates)):

            temp_dict_candidate_support = get_support(current_candidates, transactions)
            
            candidate_support = {}
            for candidate in temp_dict_candidate_support.keys():
                if temp_dict_candidate_support[candidate] >= min_support_count:
                    candidate_support[tuple(sorted(candidate))] = temp_dict_candidate_support[candidate]
            
            if len(candidate_support):
                final_candidate_support.append(candidate_support)

        else:
            break
    
    return final_candidate_support


In [None]:
frequent_itemsets = generate_frequent_itemsets(data_path, min_sup=0.01)

frequent_itemsets

In [52]:
def check_key(list_dict: list[dict[tuple, int]], key: tuple):
    for dict in list_dict:
        if key in dict.keys():
            return True
    return False


In [58]:
def generate_rules(frequent_itemsets: list[dict], min_confidence=0.5) -> list[dict]:
    """
    Generate association rule from frequent set

    Params:
    ------
    frequent_itemsets: list[dict]: contains dict <candidate, support_count>

    return: list of association rule

    rule format:
    [(antecedent, consequent), (antedecent, consequent)]
    """
    rules = []

    for k_itemset in frequent_itemsets:
        k = len(list(k_itemset.keys())[0])

        # 1-itemset cant generate rule
        if k == 1:
            continue

        """
        Generate all possible rule from a itemset
        eg:
        itemset (1, 2, 4,)

        rules: 
            1, 2 -> 4
            1, 4 -> 2
            2, 4 -> 1    
            1 -> 2, 4 
            2 -> 1, 4
            4 -> 1, 2
               
        """
        for itemset, support_count in k_itemset.items():
            H_cur = [[x] for x in itemset]
            to_remove = []
            for h in H_cur:
                antecedent = tuple(set(sorted(set(itemset) - set(h))))
                consequent = tuple(set(sorted(h)))

                confidence = support_count / frequent_itemsets[k - 2][antecedent]
                if confidence >= min_confidence:
                    rule = {tuple([antecedent, consequent]): confidence}
                    rules.append(rule)
                else:
                    to_remove.append(h)

            H_cur = generate_candidate([h for h in H_cur if h not in to_remove])

            for m in range(1, k - 1):
                if k > m + 1:
                    H_next = generate_candidate(H_cur)

                    for h in H_next:
                        antecedent = tuple(set(sorted(set(itemset) - set(h))))
                        consequent = tuple(set(sorted(h)))

                        confidence =  support_count / (frequent_itemsets[k - m - 2][antecedent])
                        
                        if confidence > min_confidence:
                            rule = {tuple([antecedent, consequent]): confidence}
                            rules.append(rule)                            
                        else:
                            to_remove.append(h)
                        
                        H_next = [x for x in H_next if x not in to_remove]
                        H_cur = H_next
                else:
                    break

    return rules                 

In [59]:
fi= generate_frequent_itemsets(data_path, 0.005)



In [None]:
rules = generate_rules(frequent_itemsets, 0.1)



In [43]:
adult_data_path = "/home/haianh/grad_project/ml-learning/data/adult.data"

grocerydata = "/home/haianh/grad_project/ml-learning/association-rule-mining-apriori/data/groceries.csv"

letter_data_path = "/home/haianh/grad_project/ml-learning/data/letter-recognition.data"


# frequent_itemsets = generate_frequent_itemsets(grocerydata, min_sup=0.01)

# frequent_itemsets

In [None]:
from time import perf_counter
supports = [0.1, 0.05, 0.03, 0.01]

for sup in supports:
    start = perf_counter()
    frequent_itemsets = generate_frequent_itemsets(letter_data_path, min_sup=sup)
    end = perf_counter()
    print(f"Support at {sup} takes: {end-start} seconds")

In [55]:
from time import perf_counter

def timeit(func):
    def timing_wrapper(*arg, **kwargs):
        start = perf_counter()
        value = func(*arg, **kwargs)
        end = perf_counter()
        print(f"{func.__name__} executed in {end-start} seconds")
        return value
    return timing_wrapper

@timeit
def add(a, b):
    return a + b

In [34]:
@timeit
def generate_candidate(list_k_item_candidtates: list):
    """
    Generate k+1 itemset candidate from list of k-item set candidate
    
    Eg:
        [
            [1, 2, 3],
            [1, 2, 4],
            [1, 2, 5],
        ]
        -->
        [
            [1, 2, 3, 4], 
            [1, 2, 3, 5],
            [1, 2, 4, 5],        
        ]
    
    
    """
    n = len(list_k_item_candidtates)
    new_candidates = []

    for i in range(n):
        for j in range(i + 1, n):
            candidate_a = list_k_item_candidtates[i]
            candidate_b = list_k_item_candidtates[j]

            if candidate_a[:-1] == candidate_b[:-1]:
                temp_candidate = []
                temp_candidate.extend(candidate_a)
                temp_candidate.append(candidate_b[-1])
                temp_candidate = sorted(temp_candidate)
                new_candidates.append(temp_candidate)
    
    return new_candidates

In [38]:
@timeit
def gen_by_comb(ilist):
    s = set()
    for li in ilist:
        s = s.union(set(li))
    return list(combinations(s, len(ilist[0]) + 1))


In [5]:
import random

# Generate a list of random integers
def generate_random_integers(n, min_value, max_value):
  """Generates a list of `n` random integers in the range `[min_value, max_value]`.

  Args:
    n: The number of random integers to generate.
    min_value: The minimum value of the random integers.
    max_value: The maximum value of the random integers.

  Returns:
    A list of `n` random integers.
  """
  random_integers = []
  for i in range(n):
    random_integers.append(random.randint(min_value, max_value))
  return random_integers

# Generate a list of random integer lists
def generate_random_integer_lists(n, m, min_value, max_value):
  """Generates a list of `n` lists of `m` random integers in the range `[min_value, max_value]`.

  Args:
    n: The number of lists to generate.
    m: The number of random integers in each list.
    min_value: The minimum value of the random integers.
    max_value: The maximum value of the random integers.

  Returns:
    A list of `n` lists of `m` random integers.
  """
  random_integer_lists = []
  for i in range(n):
    random_integer_lists.append(generate_random_integers(m, min_value, max_value))
  return random_integer_lists

# Example
print(generate_random_integer_lists(3, 5, 1, 10))

[[10, 8, 8, 3, 7], [8, 9, 2, 8, 3], [7, 8, 9, 4, 3]]


In [58]:
@timeit
def optimized_generate_frequent_itemsets(data_path, min_sup = 0.1) -> list[dict]:
    transactions, items = read(data_path)
    
    one_itemset = [[item] for item in items]

    temp_candidate_support =  get_support(one_itemset, transactions)

    final_candidate_support = []

    min_support_count = min_sup * len(transactions)
    candidate_support = {}

    # prune candidates which does not meet minimum support 
    for candidate in temp_candidate_support.keys():
        if temp_candidate_support[candidate] >= min_support_count:
            candidate_support[tuple(sorted(candidate))] = temp_candidate_support[candidate]

    final_candidate_support.append(candidate_support)

    # Generate k+1 itemset from list of k-itemset
    while(len(candidate_support)):
        current_candidates = gen_by_comb(list(candidate_support.keys()))
        
        if(len(current_candidates)):

            temp_dict_candidate_support = get_support(current_candidates, transactions)
            
            candidate_support = {}
            for candidate in temp_dict_candidate_support.keys():
                if temp_dict_candidate_support[candidate] >= min_support_count:
                    candidate_support[tuple(sorted(candidate))] = temp_dict_candidate_support[candidate]
            
            if len(candidate_support):
                final_candidate_support.append(candidate_support)

        else:
            break
    
    return final_candidate_support

In [None]:
@timeit
def generate_frequent_itemsets(data_path, min_sup = 0.1) -> list[dict]:
    transactions, items = read(data_path)
    
    one_itemset = [[item] for item in items]

    temp_candidate_support =  get_support(one_itemset, transactions)

    final_candidate_support = []

    min_support_count = min_sup * len(transactions)
    candidate_support = {}

    # prune candidates which does not meet minimum support 
    for candidate in temp_candidate_support.keys():
        if temp_candidate_support[candidate] >= min_support_count:
            candidate_support[tuple(sorted(candidate))] = temp_candidate_support[candidate]

    final_candidate_support.append(candidate_support)

    # Generate k+1 itemset from list of k-itemset
    while(len(candidate_support)):
        current_candidates = generate_candidate(list(candidate_support.keys()))
        
        if(len(current_candidates)):

            temp_dict_candidate_support = get_support(current_candidates, transactions)
            
            candidate_support = {}
            for candidate in temp_dict_candidate_support.keys():
                if temp_dict_candidate_support[candidate] >= min_support_count:
                    candidate_support[tuple(sorted(candidate))] = temp_dict_candidate_support[candidate]
            
            if len(candidate_support):
                final_candidate_support.append(candidate_support)

        else:
            break
    
    return final_candidate_support


In [59]:
generate_frequent_itemsets(data_path, min_sup=0.005)

generate_candidate executed in 0.005890868000278715 seconds
generate_candidate executed in 0.055957337000108964 seconds
generate_candidate executed in 0.009878055000626773 seconds
generate_candidate executed in 3.0019000405445695e-05 seconds


[{('citrus fruit',): 814,
  ('margarine',): 576,
  ('semi-finished bread',): 174,
  ('coffee',): 571,
  ('tropical fruit',): 1032,
  ('yogurt',): 1372,
  ('whole milk',): 2513,
  ('cream cheese',): 390,
  ('pip fruit',): 744,
  ('condensed milk',): 101,
  ('long life bakery product',): 368,
  ('other vegetables',): 1903,
  ('butter',): 545,
  ('rice',): 75,
  ('rolls/buns',): 1809,
  ('UHT-milk',): 329,
  ('bottled beer',): 792,
  ('liquor (appetizer)',): 78,
  ('potted plants',): 170,
  ('cereals',): 56,
  ('bottled water',): 1087,
  ('chocolate',): 488,
  ('white bread',): 414,
  ('curd',): 524,
  ('dishes',): 173,
  ('flour',): 171,
  ('beef',): 516,
  ('frankfurter',): 580,
  ('soda',): 1715,
  ('chicken',): 422,
  ('fruit/vegetable juice',): 711,
  ('newspapers',): 785,
  ('sugar',): 333,
  ('packaged fruit/vegetables',): 128,
  ('specialty bar',): 269,
  ('butter milk',): 275,
  ('pastry',): 875,
  ('detergent',): 189,
  ('processed cheese',): 163,
  ('candy',): 294,
  ('frozen d

In [60]:
optimized_generate_frequent_itemsets(data_path, min_sup=0.005)

gen_by_comb executed in 0.0008184609996533254 seconds
gen_by_comb executed in 0.005448461999549181 seconds
gen_by_comb executed in 5.5606999922019895e-05 seconds
optimized_generate_frequent_itemsets executed in 266.63636157900055 seconds


[{('citrus fruit',): 814,
  ('margarine',): 576,
  ('semi-finished bread',): 174,
  ('coffee',): 571,
  ('tropical fruit',): 1032,
  ('yogurt',): 1372,
  ('whole milk',): 2513,
  ('cream cheese',): 390,
  ('pip fruit',): 744,
  ('condensed milk',): 101,
  ('long life bakery product',): 368,
  ('other vegetables',): 1903,
  ('butter',): 545,
  ('rice',): 75,
  ('rolls/buns',): 1809,
  ('UHT-milk',): 329,
  ('bottled beer',): 792,
  ('liquor (appetizer)',): 78,
  ('potted plants',): 170,
  ('cereals',): 56,
  ('bottled water',): 1087,
  ('chocolate',): 488,
  ('white bread',): 414,
  ('curd',): 524,
  ('dishes',): 173,
  ('flour',): 171,
  ('beef',): 516,
  ('frankfurter',): 580,
  ('soda',): 1715,
  ('chicken',): 422,
  ('fruit/vegetable juice',): 711,
  ('newspapers',): 785,
  ('sugar',): 333,
  ('packaged fruit/vegetables',): 128,
  ('specialty bar',): 269,
  ('butter milk',): 275,
  ('pastry',): 875,
  ('detergent',): 189,
  ('processed cheese',): 163,
  ('candy',): 294,
  ('frozen d

In [1]:
from itertools import  combinations

a = [1, 2,]
b = [6, 7, 8]

tp = []
for combination in combinations(b, 2):
    tp.append(tuple(a) + tuple(sorted(combination)))



In [2]:
d: dict = {}

if not d:
    print(True)

True
