# **Project: Apriori Algorithm for Finding Frequent Itemsets with PySpark**

## Task 1: Import the Libraries and Set Up the Environment

In [2]:
import itertools
import findspark
findspark.init()
import pyspark
from pyspark.sql import *

In [3]:
conf = pyspark.SparkConf().setAppName('educative').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
spark

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/25 17:02:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Task 2: Generate Combinations—Parent Intersection Property

In [4]:
def pre_check(freq_k_1, k):
    k_size_comb = []
    for i in range(len(freq_k_1)):
        x = set(freq_k_1[i])
        for j in range(len(freq_k_1)):
            y = set(freq_k_1[j])
            if j<i:
                if len(x.intersection(y)) >= (k-2):
                    k_size_comb.append(tuple(sorted(list(x.union(y)))))
    return k_size_comb

## Task 3: Generate Combinations—Subset Frequency Property

In [5]:
def post_check(k_size_comb, freq_k_1, k):
    filtered = []
    for  comb in  k_size_comb:
        flag = False
        for sub_comb in itertools.combinations(comb, k-1):
            if sub_comb not in freq_k_1:
                flag = True
        if flag == False:
            filtered.append(tuple(comb))
    return filtered

## Task 4: Count Check

In [6]:
def count_check(filtered, lines, supCount):
    results = []
    counts = dict(zip(filtered, [0]*len(filtered)))
    for combination in filtered:
        present = [False]*len(combination)
        for i in range(len(combination)):
            for line in lines: 
                if combination[i] in line:
                    present[i] = True
                if all(present):
                    counts[combination] +=1

    for word, count in counts.items():
        if (count>=supCount):
            results.append(word)
    return results

## Task 5: Generate k-Size Combinations

In [7]:
def generator(freq_k_1, k, partition, support):
    
    lines = list(partition)
    supCount = len(lines)*support

    k_size_comb = pre_check(freq_k_1, k)
    
    filtered = post_check(k_size_comb, freq_k_1, k)
    
    return count_check(filtered, lines, supCount)

## Task 6: Generate Singles

In [8]:
def get_singles(lines, support):
    supCount = len(list(lines))*support
    vocab = set([])
    for line in lines:
        for word in line:
            vocab.add(word)
    counts = dict(zip(vocab, [0]*len(list(vocab))))
    combinations = []
    for line in lines:
        for word in line:
            counts[word] +=1
    for word, count in counts.items():
        if (count>=supCount):
            combinations.append(tuple((word,))) 
    return sorted(combinations)

## Task 7: The Worker Partition Mapper

In [9]:
seq_len = sc.broadcast(2)

In [10]:
def apriori(iterator):
    partition = []
    for v in iterator:
        partition.append(v)
    support = sup.value
    results= get_singles(partition, support)
    print('starting with', results)

    for k in range(2, seq_len.value+1):
        print('sequence length', k)
     
        combos = generator(results, k, partition, support)

        if len(combos) == 0:
            print('ending at sequence length' ,k-1)
            return results

        results = combos
    return results

## Task 8: Load Data and Preprocess

In [11]:
rdd = sc.textFile("usercode/Dataset.csv")
tagsheader = rdd.first() 
tags = sc.parallelize(tagsheader)
seq_len = sc.broadcast(3)
data = rdd.subtract(tags)
length = sc.broadcast(data.count())
sup = sc.broadcast(0.03)
lines = data.map(lambda x: x.lstrip('"').rstrip('"').split(','))



## Task 9: The Distributed Transform 

In [12]:
freq = lines.mapPartitions(apriori)
freq = freq.distinct()
comb = freq.collect()
print("Possible frequent itemset(s):\n", comb)

starting with [('avocado',), ('burgers',), ('butter',), ('cake',), ('cereals',), ('champagne',), ('chicken',), ('chocolate',), ('cookies',), ('cooking oil',), ('eggs',), ('escalope',), ('french fries',), ('fresh bread',), ('frozen smoothie',), ('frozen vegetables',), ('grated cheese',), ('green tea',), ('ground beef',), ('herb & pepper',), ('honey',), ('light mayo',), ('low fat yogurt',), ('milk',), ('mineral water',), ('olive oil',), ('pancakes',), ('red wine',), ('salmon',), ('shrimp',), ('soup',), ('spaghetti',), ('tomato juice',), ('tomatoes',), ('turkey',), ('whole wheat pasta',), ('whole wheat rice',)]
sequence length 2
[Stage 4:>                                                          (0 + 1) / 2]sequence length 3
starting with [('avocado',), ('brownies',), ('burgers',), ('cake',), ('champagne',), ('chicken',), ('chocolate',), ('cookies',), ('cooking oil',), ('eggs',), ('energy bar',), ('escalope',), ('french fries',), ('fresh bread',), ('frozen smoothie',), ('frozen vegetabl

Possible frequent itemset(s):
 [('avocado', 'burgers', 'butter'), ('burgers', 'butter', 'cake'), ('avocado', 'burgers', 'cereals'), ('avocado', 'butter', 'cereals'), ('burgers', 'cake', 'cereals'), ('butter', 'cake', 'cereals'), ('avocado', 'cake', 'champagne'), ('burgers', 'butter', 'champagne'), ('burgers', 'cereals', 'champagne'), ('butter', 'cereals', 'champagne'), ('avocado', 'cake', 'chicken'), ('avocado', 'champagne', 'chicken'), ('burgers', 'butter', 'chicken'), ('burgers', 'cereals', 'chicken'), ('butter', 'cereals', 'chicken'), ('cake', 'champagne', 'chicken'), ('avocado', 'burgers', 'chocolate'), ('avocado', 'butter', 'chocolate'), ('avocado', 'cereals', 'chocolate'), ('burgers', 'cake', 'chocolate'), ('burgers', 'champagne', 'chocolate'), ('burgers', 'chicken', 'chocolate'), ('butter', 'cake', 'chocolate'), ('butter', 'champagne', 'chocolate'), ('butter', 'chicken', 'chocolate'), ('cake', 'cereals', 'chocolate'), ('cereals', 'champagne', 'chocolate'), ('cereals', 'chicken',

                                                                                

## Task 10: Auxiliary Function to Check Presence

In [None]:
def auxiliary(row, combinations):
    present= []
    for combination in combinations:
        presence = [False]*len(combination)
        for i in range(len(combination)):
            presence[i] = combination[i] in row
        if all(presence):
            present+=[combination]
    return present

## Task 11: Count Check at Master

In [None]:
comb = sc.broadcast(comb)
freq1 = lines.map(lambda x: [(key, 1) for key in auxiliary(x, comb.value)]).filter(lambda x: len(x)>0)

freq2 = freq1.flatMap(lambda x: x)
freq3 = freq2.reduceByKey(lambda x, y: x+y)
freq4 = freq3.filter(lambda x: x[1]>sup.value*length.value).map(lambda x: x[0])
freq4.collect()

# End