In [None]:
!pip install --quiet pyspark

In [None]:
from pyspark import SparkConf, SparkContext
from itertools import chain, combinations
import time

In [None]:
from google.colab import drive
drive.mount("/content/gdrive")

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
if 'sc' in globals():
    sc.stop()

conf = SparkConf().setAppName("YAFIM").setMaster("local[*]")
sc = SparkContext(conf=conf)

### Utility Functions

#### Print List

In [None]:
def printList(list_a):
  for e in list_a:
    print (e)

In [None]:
DEBUG = 1

def Dprint(info):
    if DEBUG:
        print(info)

#### Generate Next Candidates

In [None]:
def generate_next_c(f_k, k):
    next_c = [var1 | var2 for index, var1 in enumerate(f_k) for var2 in f_k[index + 1:] if
              list(var1)[:k - 2] == list(var2)[:k - 2]]
    return next_c

#### Filter Candidates for Frequent Itemset

In [None]:
def generate_f_k(sc, c_k, shared_itemset, sup):
    def get_sup(x):
        x_sup = len([1 for t in shared_itemset.value if x.issubset(t)])
        if x_sup >= sup:
            return x, x_sup
        else:
            return ()

    f_k = sc.parallelize(c_k).map(get_sup).filter(lambda x: x).collect()
    return f_k

#### Find Subsets of Given Itemset

In [None]:
def find_subsets(itemset):
    return [frozenset(subset) for subset in chain.from_iterable(combinations(itemset, r) for r in range(1, len(itemset)))]


#### Generate Rules

In [None]:
def generate_rules(frequent_itemsets, transactions, min_confidence):

    frequent_itemsets_dict = {itemset: support_count for itemset, support_count in frequent_itemsets}

    rules = []
    for itemset, support_count in frequent_itemsets_dict.items():
        if len(itemset) > 1:
            subsets = list(find_subsets(itemset))
            for subset in subsets:
                antecedent = frozenset(subset)
                consequent = itemset - antecedent
                if consequent:
                    antecedent_support = frequent_itemsets_dict.get(antecedent, 0)
                    if antecedent_support > 0:
                        confidence = support_count / antecedent_support
                        if confidence >= min_confidence:
                            rules.append({
                                'rule': (set(antecedent), set(consequent)),
                                'support': support_count,
                                'confidence': round(confidence, 2)
                            })
    return rules

### YAFIM: Apriori Using Spark

In [None]:
def ParallelAprioriRunner(sc, data, min_sup, min_confidence):
    import time
    start = time.time()
    # --------- Phase I -----------
    # Step 1: Load Data
    input = sc.textFile(data)
    filtered_input = input.filter(lambda line: line.strip() and not all(char == ',' for char in line.strip()))

    # Step 2: Extract Transactions
    TransactionRDD = filtered_input.map(lambda line: line.strip().split(","))
    transactions = TransactionRDD.map(lambda x: set(map(int, filter(None, x))))  # Transactions as sets
    transactions_collected = transactions.collect()  # Collect transactions for broadcasting

    # Step 3:
    broadcast_transactions = sc.broadcast(transactions_collected)

    # Count total transactions
    n_samples = len(transactions_collected)
    sup = n_samples * min_sup
    print(f"Support threshold: {sup}")

    # Step 4: Initialize Variables
    frequent_itemsets = []
    k = 1
    current_frequent_itemsets = []

    #Step 5: Generate Frequent Itemset for k = 1
    single_items = transactions.flatMap(lambda x: x).map(lambda x: (frozenset([x]), 1))
    single_items_count = single_items.reduceByKey(lambda x, y: x + y)
    current_frequent_itemsets = single_items_count.filter(lambda x: x[1] >= sup).collect()
    print(f"Frequent Itemsets for k = 1: ")
    printList(current_frequent_itemsets)

    frequent_itemsets.extend(current_frequent_itemsets)

    # --------- Phase II -----------
    # Step 6: Generate Frequent Itemsets for k>=2
    print("\nGenerating frequent itemsets...")
    while current_frequent_itemsets:
        current_frequent_itemsets_only = [itemset for itemset, _ in current_frequent_itemsets]
        candidate_itemsets = [
            i.union(j) for i in current_frequent_itemsets_only for j in current_frequent_itemsets_only if len(i.union(j)) == k + 1
        ]
        candidate_itemsets_rdd = sc.parallelize(candidate_itemsets).distinct()
        candidate_counts = candidate_itemsets_rdd.map(
            lambda candidate: (
                candidate,
                sum(1 for transaction in broadcast_transactions.value if candidate <= transaction)
            )
        )

        current_frequent_itemsets = candidate_counts.filter(lambda x: x[1] >= sup).collect()
        print(f"Frequent Itemsets for k = {k}: {len(current_frequent_itemsets)}")
        frequent_itemsets.extend(current_frequent_itemsets)

        k += 1


    print(f"Frequent Itemsets Length: {len(frequent_itemsets)}")
    # print(f"Frequent Itemsets with Frequencies: {frequent_itemsets}")

    #Step 7: Generate Rules
    print("\nGenerating association rules...")
    rules = generate_rules(frequent_itemsets, transactions, min_confidence)
    print(f"Total {len(rules)} association rules generated")
    print(f"Execution Time: {time.time() - start} seconds")


    return frequent_itemsets, rules  # Each itemset is now accompanied by its frequency (count)


### PUMSB Dataset

In [None]:
pumsb = "/content/gdrive/MyDrive/Big Data/datasets/pumsb.csv"
min_support = 0.7
min_conf = 0.3
frequent_itemsets, rules = ParallelAprioriRunner(sc, pumsb, min_support, min_conf)
print(f"----Frequent Itemsets-------")
print(frequent_itemsets)
print(f"\n----Association Rules-------")
print(rules)

### Retail Dataset

In [None]:
retail = "/content/gdrive/MyDrive/Big Data/datasets/retail.csv"
min_support = 0.025
min_conf = 0.3
frequent_itemsets, rules = ParallelAprioriRunner(sc, retail, min_support, min_conf)
print(f"----Frequent Itemsets-------")
print(frequent_itemsets)
print(f"\n----Association Rules-------")
print(rules)

### Mushroom Dataset

In [None]:
mushroom = "/content/gdrive/MyDrive/Big Data/datasets/mushroom.csv"
min_support = 0.3
min_conf = 0.3
frequent_itemsets, rules = ParallelAprioriRunner(sc, mushroom, min_support, min_conf)
print(f"----Frequent Itemsets-------")
print(frequent_itemsets)
print(f"\n----Association Rules-------")
print(rules)