# **Project: Apriori Algorithm for Finding Frequent Itemsets with PySpark**

## Task 1: Import the Libraries and Set Up the Environment

In [2]:
import itertools
import findspark
findspark.init()
import pyspark
from pyspark.sql import *

In [6]:
#initialize a Spark session
conf=pyspark.SparkConf().setAppName('rivana').setMaster('local')
sc=pyspark.SparkContext(conf=conf)
spark=SparkSession(sc)
spark

23/12/02 15:37:57 WARN Utils: Your hostname, Hannahs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.118 instead (on interface en0)
23/12/02 15:37:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/12/02 15:37:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Task 2: Generate Combinations—Parent Intersection Property

For combinations of length k,k−2 elements must be common in both parents. \
All subsets of the itemset must be frequent. \
The itemset must have a count greater than the specified support.

freq_k_1: A list of frequent itemsets proposed with length k−1 \
k: The size of the itemsets to create. \
k_size_comb: A list containing combinations of size k

The function generates candidate itemsets of size k based on the frequent itemsets of size (k-1) discovered in the previous iteration of the Apriori algorithm

In [7]:
def pre_check(freq_k_1,k):
    k_size_comb=[]
    for i in range(len(freq_k_1)):
        x=set(freq_k_1[i])
        for j in range(len(freq_k_1)):
            y=set(freq_k_1[j])
   #Ensures that the combination is considered only once by checking that j is less than i.
            if j<i:
                if len(x.intersection(y))>=(k-2):
                    k_size_comb.append(tuple(sorted(list(x.union(y)))))
    return k_size_comb

## Task 3: Generate Combinations—Subset Frequency Property

According to the second rule of the Apriori algorithm, a combination is frequent only if all its subsets are frequent.

The function returns the list of combinations that passed the filtering criteria

In [40]:
import itertools  # Import the itertools module

def post_check(k_size_comb, freq_k_1, k):
    filtered = []  # Initialize an empty list to store filtered combinations
    for comb in k_size_comb:
        flag = False  # Initialize a flag to track whether all (k-1)-subsets are in freq_k_1
        for sub_comb in itertools.combinations(comb, k - 1):
            if sub_comb not in freq_k_1:
                flag = True  # Set the flag to True if any (k-1)-subset is not in freq_k_1
        if flag == False:
            filtered.append(tuple(comb))  # Append the combination to the filtered list if all (k-1)-subsets are in freq_k_1
    return filtered


## Task 4: Count Check

The function counts the occurrences of combinations in a list of lines and filters those combinations based on a specified support count (supCount). The result is a list of combinations that satisfy the support count condition.

1)Filtered Combinations:filtered_combinations is a list of combinations obtained from a previous step (e.g., post_check). 

2)Lines:lines is a list of transactions where each transaction is represented as a list of items. 

3)Support Count Threshold:supCount is set to 2, meaning we want to filter combinations that occur in at least 2 transactions. 

Function Call: count_check(filtered_combinations, lines, supCount): The function is called with the filtered combinations, the list of lines, and the support count threshold. 

Execution:

The function iterates over each combination in filtered_combinations.
For each combination, it checks the presence of each item in the combination in the list of lines.
If all items in the combination are present in a line, the count for that combination is incremented.
After processing all combinations, the function filters combinations based on the support count threshold. 

Result:The result is a list of combinations that meet the support count criteria.

In [48]:
def count_check(filtered, lines, supCount):
    results = []  # Initialize an empty list to store combinations that meet the support count criteria
    counts = dict(zip(filtered, [0]*len(filtered)))  # Initialize a dictionary to store counts for each combination

    # Iterate over each combination in the filtered list
    for combination in filtered:
        present = [False]*len(combination)  # Initialize a list to track the presence of each item in the combination
        for i in range(len(combination)):
            # Iterate over each line in the list of lines
            for line in lines:
                if combination[i] in line:
                    present[i] = True  # Set the flag to True if the item is present in the line
                if all(present):
                    counts[combination] += 1  # Increment the count for the combination if all items are present in a line

    # Iterate over the counts dictionary and filter combinations based on the support count
    for word, count in counts.items():
        if count >= supCount:
            results.append(word)  # Append combinations with counts greater than or equal to supCount to the results list

    return results


In [49]:
# Example data
filtered_combinations = [
    ('A', 'B', 'C'),
    ('B', 'C', 'D'),
    ('A', 'B', 'D'),
]

lines = [
    ['A', 'B', 'C', 'D'],
    ['B', 'C', 'E'],
    ['A', 'B', 'D', 'F'],
    ['A', 'B', 'C', 'D'],
]

supCount = 2  # Support count threshold

# Call the count_check function
result = count_check(filtered_combinations, lines, supCount)

# Print the result
print(result)


[('A', 'B', 'C'), ('B', 'C', 'D'), ('A', 'B', 'D')]


## Task 5: Generate k-Size Combinations

In [50]:
def generator(freq_k_1, k, partition, support):
    
    lines = list(partition)
    supCount = len(lines)*support

    k_size_comb = pre_check(freq_k_1, k)
    
    filtered = post_check(k_size_comb, freq_k_1, k)
    
    return count_check(filtered, lines, supCount)

## Task 6: Generate Singles

Preapre the data to be fed into the generator() function bt creating a function named get_singles().
This function will take the dataset partition available to the worked node as input and return the frequent words observed in it as tuples of size 1

In [51]:
def get_singles(lines, support):
    supCount = len(list(lines))*support
    vocab = set([])
    for line in lines:
        for word in line:
            vocab.add(word)
    counts = dict(zip(vocab, [0]*len(list(vocab))))
    combinations = []
    for line in lines:
        for word in line:
            counts[word] +=1
    for word, count in counts.items():
        if (count>=supCount):
            combinations.append(tuple((word,))) 
    return sorted(combinations)

## Task 7: The Worker Partition Mapper

Map partitions at the worker nodes. \
Take the partition at the worker nodes and generate the sequences that have an occurrence count greater than the support count

In [66]:
#declare the maximum length of sequences to generate seq_len, 
#and the apriori() function that implement the Apriori algorithm
seq_len = sc.broadcast(2)

In [72]:
def apriori(iterator):
    partition = []
    for v in iterator:
        partition.append(v)
    support = sup.value
    results= get_singles(partition, support)
    print('starting with', results)

    for k in range(2, seq_len.value+1):
        print('sequence length', k)
     
        combos = generator(results, k, partition, support)

        if len(combos) == 0:
            print('ending at sequence length' ,k-1)
            return results

        results = combos
    return results

## Task 8: Load Data and Preprocess

Load the data to a RDD (resilent distributed dataset) and perform preprocessing jobs 
* RDDs enable parallel distributed processing and are immutable elements; you can't alter them after creation. 
* When you call an operation on an RDD, PySpark performs lazy evaluations;
operations are not immediately performed and are onlt executed when an action such as collect is invoked

In [89]:
rdd=sc.textFile("/Users/hannah/Documents/projects/Dataset.csv")
tagsheader = rdd.first() 
tags = sc.parallelize(tagsheader)
seq_len = sc.broadcast(3)
data = rdd.subtract(tags)
length = sc.broadcast(data.count())
sup = sc.broadcast(0.03)
lines = data.map(lambda x: x.lstrip('"').rstrip('"').split(','))

## Task 9: The Distributed Transform 

Use the apriori() function to retrieve the frequenct combinations for each partition in a parallel distributed manner
* rdd.mapPartitions(): mine combinations in a parallel manner. The method accepts the name of the function you want to map the partitions with as aparameter.
* rdd.distinct(): drop duplicates in the RDD
* rdd.collect() action to collect the data at the master node

In [98]:
freq=lines.mapPartitions(apriori)
freq=freq.distinct()
comb=freq.collect()
# print("Possible frequent itemset(s):\n", comb)

starting with [('avocado',), ('burgers',), ('butter',), ('cake',), ('cereals',), ('champagne',), ('chicken',), ('chocolate',), ('cookies',), ('cooking oil',), ('eggs',), ('escalope',), ('french fries',), ('fresh bread',), ('frozen smoothie',), ('frozen vegetables',), ('grated cheese',), ('green tea',), ('ground beef',), ('herb & pepper',), ('honey',), ('light mayo',), ('low fat yogurt',), ('milk',), ('mineral water',), ('olive oil',), ('pancakes',), ('red wine',), ('salmon',), ('shrimp',), ('soup',), ('spaghetti',), ('tomato juice',), ('tomatoes',), ('turkey',), ('whole wheat pasta',), ('whole wheat rice',)]
sequence length 2
sequence length 3
starting with [('avocado',), ('brownies',), ('burgers',), ('cake',), ('champagne',), ('chicken',), ('chocolate',), ('cookies',), ('cooking oil',), ('eggs',), ('energy bar',), ('escalope',), ('french fries',), ('fresh bread',), ('frozen smoothie',), ('frozen vegetables',), ('grated cheese',), ('green tea',), ('ground beef',), ('herb & pepper',), (

## Task 10: Auxiliary Function to Check Presence

Once all the worker nodes have computed the list of frequent combinations, the master node should check their frequency centrally.
* create an auxiliary() function that takes in the distinct combinations and a dataset row, and yields the combinations present in the row

In [99]:
def auxiliary(row, combinations):
    present= []
    for combination in combinations:
        presence = [False]*len(combination)
        for i in range(len(combination)):
            presence[i] = combination[i] in row
        if all(presence):
            present+=[combination]
    return present

## Task 11: Count Check at Master

Use the auxiliary function to determine the occurrence counts of the combinations and filter them out if it is less than the support.
* Broadcast the combinations collected from the apriori() method.
* Generate tuples with the (combinations,1) format ever time a combination occurs in a dataset.
* Use the RDD.flatMap() method to convert a list of lists into a single list.
* Use the Rdd.reduceBy Key() method to generate occurrence counts of each combination.
* Filter out the combinations that have occurrence counts less than the support count.
* Collect the filtered combinations

if you pause too long between running apriori() and running the following code block, you will get error

In [100]:
comb = sc.broadcast(comb)
freq1 = lines.map(lambda x: [(key, 1) for key in auxiliary(x, comb.value)]).filter(lambda x: len(x)>0)

freq2 = freq1.flatMap(lambda x: x)
freq3 = freq2.reduceByKey(lambda x, y: x+y)
freq4 = freq3.filter(lambda x: x[1]>sup.value*length.value).map(lambda x: x[0])
freq4.collect()

                                                                                

[('eggs', 'ground beef', 'spaghetti')]

# End