# PCY Algorithm (Using PySpark)

## Import Section

In [1]:
from pyspark.sql import SparkSession

In [2]:
from itertools import combinations

In [3]:
import math
import numpy as np

## Initializing Spark

In [4]:
spark = SparkSession.builder.getOrCreate()
spark

23/04/22 16:46:07 WARN Utils: Your hostname, amir-GL553VE resolves to a loopback address: 127.0.1.1; using 192.168.43.187 instead (on interface wlp2s0)
23/04/22 16:46:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/22 16:46:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
sc = spark.sparkContext

## Reading Input and Necessary Transformation
* Reading Input
* Creating ID for Items
* Throwing Away Unnecessary Columns
* Changing Itemname for ItemID
* Transforming bills to form (BillNo, {set of ItemIDs})
* Transforming bills to form (BillNo, \[list of sorted ItemIDs\])
Sorting is useful for creating and counting combinations later as it makes sure they are always in the same order

In [6]:
transactions = spark.read.options(header=True, delimiter=";").csv("transactions.csv")
transactions.printSchema()

root
 |-- BillNo: string (nullable = true)
 |-- Itemname: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [7]:
items = transactions.select(transactions.Itemname).distinct().rdd.map(lambda x: x.Itemname).zipWithIndex()
bills = transactions.select(transactions.BillNo, transactions.Itemname).rdd.map(lambda x: (x.Itemname, x.BillNo)).join(items).values()
item_ids = items.map(lambda x: (x[1], x[0]))
del items



In [8]:
del transactions

In [9]:
baskets = bills.map(lambda x: (x[0], {x[1]})).reduceByKey(lambda x, y: x.union(y)).mapValues(lambda x: sorted(x))

In [10]:
number_of_items = item_ids.count()
number_of_baskets = baskets.count()
print("Number of Baskets: ", number_of_baskets)
print("Number of Unique Items: ", number_of_items)



Number of Baskets:  21663
Number of Unique Items:  4186


                                                                                

## Setting Up Constants
* Support 
* Number of Buckets
* hash function (for hashing pairs into buckets)

In [11]:
support = 250 # Smaller number for support creates too many frequent items and pairs. Larger number for support eliminates frequent sets with 4 items
num_buckets = 10000 # Larger numbers don't change number of frequent buckets (limitation of hash function and data) and Smaller number (lower than 8500) will be saturated
min_confidence = 0.8 # for rules

In [12]:
hash_function = lambda x: (hash(x[0] + x[1]) % num_buckets)
# hash_function = lambda x: ((x[0] + x[1]) % num_buckets)

## Approach 1 - Using Bit Vectors (For 1st and 2nd Passes)
1. Finding Frequent Items and Frequent Buckets (Count Using Map (flatMap) and Reduce (By Key) and then filtering)
2. Creating Bit Vectors for frequent items and buckets in the driver
    1. Creating Bit Vectors Using a list of accumulators (list is sent to every executor, accumulators are shared)
    2. Moving Bit Vectros to numpy arrays in the driver
    3. Sending bit vectors to executors along with functions to find frequent pairs (happens automatically in spark)
3. Next passes
    1. Creating sets from frequent sets of last step (sets of size n+1 from sets of size n)
    2. Creating sets using union, if a set has exactly n+1 items we check if it has n+1 subsets in frequent sets of last step (to minimize candidate sets, all subsets of a frequent set must be frequent as well)
    3. Map: if all elements of a candidate sets are present in a basket, we map it to (set, 1)
    4. Counting using reduce by key, then filtering
    5. Continuing to next pass (repeat this cycle) as long as this pass yields atleast n+1 frequent set
    
* We Use ItemIDs to reduce data that is being processed
* In the beggining an RDD is created to save (id, name) for each item Later another RDD is created to store same information only for frequent_items (using join with frequent items)
* In the end we collect (id, name) of frequent items (as map) in driver and replace ids with names in rules

In [13]:
frequents = [] # for storing items, pairs and frequent sets

In [14]:
# Finding frequent items and their support
frequent_items = baskets.flatMap(lambda x: [(item, 1) for item in x[1]]).reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] > support)
frequent_item_ids = frequent_items.join(item_ids).map(lambda x: (x[0], x[1][1]))
frequents.append(frequent_items.collect()) # Stroing frequent items

                                                                                

In [15]:
# Finding frequent buckets
frequent_buckets = baskets.flatMap(
    lambda x: [(hash_function(comb), 1) for comb in combinations(x[1], 2)]
).reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] > support).keys()

In [16]:
print("Number of Frequent Items: ", len(frequents[0]))

Number of Frequent Items:  591


In [17]:
print("Number of Frequent Buckets: ", frequent_buckets.count())



Number of Frequent Buckets:  7798


                                                                                

In [18]:
# Converting frequent items to bitvectors

num_items = number_of_items
bit_vectors_accs = [sc.accumulator(0) for i in range(math.ceil(num_items / 32))] # Creating Accumulators to make parallel operations possible

frequent_items.foreach(lambda x: bit_vectors_accs[x[0] // 32].add(1 << (x[0] % 32))) # Generating Bit Vectors

# Moving bitvectors to np.array
items_bitvector = np.empty(math.ceil(num_items / 32), dtype=np.uint32)
for i in range(len(bit_vectors_accs)):
    items_bitvector[i] = bit_vectors_accs[i].value

# Getting rid of redundant data
del bit_vectors_accs, frequent_items

In [19]:
# Converting frequent buckets to bitvectors

bit_vectors_accs = [sc.accumulator(0) for i in range(math.ceil(num_buckets / 32))] # Creating Accumulators to make parallel operations possible

frequent_buckets.foreach(lambda x: bit_vectors_accs[x // 32].add(1 << (x % 32))) # Generating Bit Vectors

# Moving bitvectors to np.array
buckets_bitvector = np.empty(math.ceil(num_buckets / 32), dtype=np.uint32)
for i in range(len(bit_vectors_accs)):
    buckets_bitvector[i] = bit_vectors_accs[i].value

# Getting rid of redundant data
del bit_vectors_accs, frequent_buckets

In [20]:
# Finding Frequent Pairs

def generate_pairs(x):
    # Map function
    # From all possible pairs for each baskets, only returns those where both items and corresponding bucket are frequent
    pairs = []
    for comb in combinations(x[1], 2):
        comb_hash = hash_function(comb)
        if ((items_bitvector[comb[0] // 32] >> (comb[0] % 32)) % 2 == 1) and \
           ((items_bitvector[comb[1] // 32] >> (comb[1] % 32)) % 2 == 1) and \
           ((buckets_bitvector[comb_hash // 32] >> (comb_hash % 32)) % 2 == 1):
              pairs.append((comb, 1))
    return pairs

# Counting occurances of each candidate pair and filtering frequent pairs
frequent_pairs = baskets.flatMap(generate_pairs).reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] > support)

frequents.append(frequent_pairs.collect())
del frequent_pairs, items_bitvector, buckets_bitvector

                                                                                

In [21]:
print("Number of Frequent Pairs: ", len(frequents[1]))

Number of Frequent Pairs:  389


In [22]:
# 3rd Pass and forward

def generate_function(candidates):
    # This function returns map function based on candidates sets given as input
    # Created for convinience and cleaning code
    def generate_sets(x):
        # Map function created based on candidate sets
        sets = []
        for s in candidates:
            # For each candidate set, map it to (set, 1) if all of set's members are present in basket
            flag = True
            for item in s:
                if item not in x[1]:
                    flag = False
                    break
            if flag:
                sets.append((s, 1))
        return sets
    return generate_sets


n = 2 # current size of largest sets
while len(frequents[-1]) > n+1:
    # Start of a pass
    frequent_sets = [set(x[0]) for x in frequents[-1]] # Converting tuples to set to use union function
    candidates_set = set() # Creating a set for storing candidates to avoid duplicates
    for i in range(len(frequent_sets)):
        for j in range(i + 1, len(frequent_sets)):
            # Create union for each pair of frequent sets with size n
            s = frozenset(frequent_sets[i].union(frequent_sets[j]))
            if len(s) == n + 1:
                # if the created set has exactly n+1 items, check if it has n+1 subsets if frequent sets with size n, if so, add it to candidate sets
                c = 0
                for k in range(len(frequent_sets)):
                    c += 1 if frequent_sets[k].issubset(s) else 0
                if c == n + 1:
                    candidates_set.add(s)
    candidates = [tuple(s) for s in candidates_set] # converting candidate sets to tuples (for reducing memory footprint and faster operations)
    
    # If there is atleast one candidate set, count it's occurances
    if len(candidates) > 0:
        fs = baskets.flatMap(generate_function(candidates)).reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] > support)
        frequents.append(fs.collect())
        n += 1
        print("Number of sets with ", n, " items: ", len(frequents[-1]))
    else:
        frequents.append([])

                                                                                

Number of sets with  3  items:  79
Number of sets with  4  items:  3


## Finding Association Rules
1. Finding all association rules, keeping those with their confidence over a threshold
2. Sorting rules based on their interest
3. Changin ItemIDs for their names

In [23]:
def is_subset_of(subset: tuple, superset: tuple):
    if type(subset) == int:
        subset = (subset,)
    subset = sorted(subset)
    superset = sorted(superset)
    
    difference = []
    subset_index = 0
    superset_index = 0
    subset_len = len(subset)
    superset_len = len(superset)
    while subset_index < subset_len and superset_index < superset_len:
        if subset[subset_index] == superset[superset_index]:
            subset_index += 1
            superset_index += 1
        elif subset[subset_index] < superset[superset_index]:
            return False, None
        else: # subset[subset_index] > superset[superset_index]
            while superset_index < superset_len and subset[subset_index] > superset[superset_index]:
                difference.append(superset[superset_index])
                superset_index += 1
    
    if subset_index >= subset_len:
        difference.extend(superset[superset_index:])
        return True, difference
    else:
        return False, None

In [24]:
def probability(items, itemID, number_of_baskets):
    item_found = False
    i = 0
    items_len = len(items)
    while i < items_len and not item_found:
        if itemID == items[i][0]:
            item_support = items[i][1]
            item_found = True
        i += 1
    
    if item_found:
        return item_support / number_of_baskets
    else:
        return 0

In [25]:
rules = [] # to save rules in this format ((set, item), confidence, interest)
for i in range(len(frequents) - 1):
    for base in frequents[i]:
        for target in frequents[i + 1]:
            is_subset, diff = is_subset_of(base[0], target[0])
            if is_subset:
                confidence = target[1] / base[1]
                if confidence >= min_confidence:
                    interest = abs(confidence - probability(frequents[0], diff[0], number_of_baskets))
                    rules.append(((base[0], diff[0]), confidence, interest))

rules.sort(key=lambda x: x[2], reverse=True)

In [26]:
print(len(rules), "Rules Extracted: (Rules sorted by interest)")
print()
index_format = "%" + str(math.ceil(math.log10(len(rules)))) + "d" # to make sure spacing will be the same for all rules
for i in range(len(rules)):
    print("Rule", index_format%(i+1), ": ", rules[i][0][0], "->", rules[i][0][1], " - interrest: ", "%1.5f"%rules[i][2], " - confidence: ", "%1.5f"%rules[i][1])

24 Rules Extracted: (Rules sorted by interest)

Rule  1 :  3353 -> 3411  - interrest:  0.89496  - confidence:  0.91135
Rule  2 :  (762, 2321) -> 2602  - interrest:  0.86887  - confidence:  0.89273
Rule  3 :  (560, 2641, 2403) -> 1934  - interrest:  0.86114  - confidence:  0.90615
Rule  4 :  (2403, 2641) -> 1934  - interrest:  0.85850  - confidence:  0.90351
Rule  5 :  (560, 2641) -> 1934  - interrest:  0.82526  - confidence:  0.87027
Rule  6 :  (560, 2641, 1934) -> 2403  - interrest:  0.82280  - confidence:  0.86957
Rule  7 :  (226, 1432) -> 1117  - interrest:  0.81461  - confidence:  0.86207
Rule  8 :  3411 -> 3347  - interrest:  0.81460  - confidence:  0.83380
Rule  9 :  (1432, 2028, 2334) -> 1117  - interrest:  0.80969  - confidence:  0.85714
Rule 10 :  2548 -> 308  - interrest:  0.80844  - confidence:  0.83129
Rule 11 :  (1934, 2641) -> 2403  - interrest:  0.80448  - confidence:  0.85124
Rule 12 :  2321 -> 2602  - interrest:  0.79042  - confidence:  0.81429
Rule 13 :  (560, 2641) -

In [27]:
def rule_id_to_name(id_name_map, rule):
    rule_base = None
    if type(rule[0][0]) == int:
        rule_base = id_name_map[rule[0][0]]
    else:
        rule_base = tuple([id_name_map[item_id] for item_id in rule[0][0]])
    
    rule_target = id_name_map[rule[0][1]]
    
    return ((rule_base, rule_target), interest, confidence)

In [28]:
# Changin IDs in rules with item names
id_name = frequent_item_ids.collectAsMap()
rules_named = [rule_id_to_name(id_name, rule) for rule in rules]

In [29]:
print(len(rules_named), "Rules Extracted: (Rules sorted by interest)")
print()
index_format = "%" + str(math.ceil(math.log10(len(rules)))) + "d" # to make sure spacing will be the same for all rules
for i in range(len(rules_named)):
    print("Rule", index_format%(i + 1), ": ")
    print(rules_named[i][0][0], "->", rules_named[i][0][1], 
          " - interrest: ", "%1.5f"%rules_named[i][2], " - confidence: ", "%1.5f"%rules_named[i][1])
    print()

24 Rules Extracted: (Rules sorted by interest)

Rule  1 : 
REGENCY TEA PLATE PINK -> REGENCY TEA PLATE GREEN  - interrest:  0.64806  - confidence:  0.80969

Rule  2 : 
('SET/20 RED RETROSPOT PAPER NAPKINS', 'SET/6 RED SPOTTY PAPER CUPS') -> SET/6 RED SPOTTY PAPER PLATES  - interrest:  0.64806  - confidence:  0.80969

Rule  3 : 
('REGENCY CAKESTAND 3 TIER', 'PINK REGENCY TEACUP AND SAUCER', 'ROSES REGENCY TEACUP AND SAUCER') -> GREEN REGENCY TEACUP AND SAUCER  - interrest:  0.64806  - confidence:  0.80969

Rule  4 : 
('ROSES REGENCY TEACUP AND SAUCER', 'PINK REGENCY TEACUP AND SAUCER') -> GREEN REGENCY TEACUP AND SAUCER  - interrest:  0.64806  - confidence:  0.80969

Rule  5 : 
('REGENCY CAKESTAND 3 TIER', 'PINK REGENCY TEACUP AND SAUCER') -> GREEN REGENCY TEACUP AND SAUCER  - interrest:  0.64806  - confidence:  0.80969

Rule  6 : 
('REGENCY CAKESTAND 3 TIER', 'PINK REGENCY TEACUP AND SAUCER', 'GREEN REGENCY TEACUP AND SAUCER') -> ROSES REGENCY TEACUP AND SAUCER  - interrest:  0.64806  

## Approach 2 - Only Using Spark (For 1st and 2nd Passes)
Only First and Second Passes are implemented

This Approach is slower and requires more memory

1. Finding Frequent Items and Frequent Buckets (Count Using Map (flatMap) and Reduce (By Key) and then filtering)
2. Creating All pairs of Frequent Items, Filtering out pairs with same item in both places, map them to their hash (hash, pair), joining them with frequent buckets to filter out pairs that are in an infrequent bucket)
3. Creating All combinations (of two items) for each basket, joining them with result of previous step (to eliminate unnecessary pairs)
4. Counting remaining pairs using MapReduce and Filtering Pairs with support above set threshold
5. Next passes
    1. Creating sets from frequent sets of last step (sets of size n+1 from sets of size n)
    2. Creating sets using union, if a set has exactly n+1 items we check if it has n+1 subsets in frequent sets of last step (to minimize candidate sets, all subsets of a frequent set must be frequent as well)
    3. Map: if all elements of a candidate sets are present in a basket, we map it to (set, 1)
    4. Counting using reduce by key, then filtering
    5. Continuing to next pass (repeat this cycle) as long as this pass yields atleast n+1 frequent set

In [30]:
# Step 1
frequent_items = baskets.flatMap(
    lambda x: [(item, 1) for item in x[1]]
).reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] > support).keys()

frequent_buckets = baskets.flatMap(
    lambda x: [(hash_function(comb), 1) for comb in combinations(x[1], 2)]
).reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] > support).keys()

# Step 2 + Transforming Data format for operations, the result will be frequent pairs
s2 = frequent_items.cartesian(frequent_items).filter(lambda x: x[0] != x[1]).map(
    lambda x: (hash_function(x), x)
).join(frequent_buckets.map(lambda x: (x, None))).map(lambda x: x[1])

# Step 3
s3 = baskets.flatMap(lambda x: [(comb, 1) for comb in combinations(x[1], 2)]).join(s2)

# Step 4
frequent_pairs = s3.map(lambda x: (x[0], x[1][0])).reduceByKey(lambda x, y: x + y).filter(lambda x: x[1] > support)
print("Number of Frequent Pairs: ", frequent_pairs.count())



Number of Frequent Pairs:  389


                                                                                