# MDLE: Assignment 1
## 1. A-Priori algorithm

As explained in the first notebook [(ex01_preprocessing.ipynb)](ex01_preprocessing.ipynb), students were provided with the file `conditions.csv.gz` which lists conditions for a large set of patients. Our purpose is to find associations between conditions.

To accomplish our goal, we implement the A-Priori algorithm, a classic approach for [frequent itemset mining (exercises 1.1 and 1.2)](ex01_algorithm.ipynb#1.1) and posterior [association rule learning (exercise 1.3)](./#1.3).


In [1]:
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

In [2]:
INPUT_FILE_PATH = "data/conditions.parquet"
RESULTS_DIRECTORY_PATH = "data/"

MIN_SUPPORT_THRESHOLD = 1000

`SparkSession` offers a very simple way to read compressed files into a DataFrame. However, in order to implement the algorithm, as we intend to use RDDs, we leverage the `SparkContext` that can be accessed through the session. 

In [3]:
spark = SparkSession.builder.appName("A-Priori").getOrCreate()
sc = spark.sparkContext

24/04/04 13:24:13 WARN Utils: Your hostname, omen resolves to a loopback address: 127.0.1.1; using 192.168.37.0 instead (on interface wlo1)
24/04/04 13:24:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/04 13:24:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
data = spark.read \
    .format("parquet") \
    .option("compression", "gzip") \
    .load("data/conditions.parquet")

baskets = data.rdd

data.show()

                                                                                

+--------------------+--------------------+
|             PATIENT|          CONDITIONS|
+--------------------+--------------------+
|0000055d-e9a9-4f6...|[65966004, 10509002]|
|0000e9ce-2e20-4c2...|[65966004, 161140...|
|0000fc30-1096-40b...|[271737000, 59621...|
|0001b288-1320-470...|[162864005, 72892...|
|000246a4-c6f5-480...|[65363002, 284549...|
|0003a636-b172-48c...|[196416002, 62106...|
|0006d39d-364a-46a...|[428251008, 59621...|
|0007a215-694b-428...|[162864005, 72892...|
|00085029-7bdd-467...|[271737000, 53741...|
|0008dd63-85c3-47b...|[58150001, 271737...|
|0008ed08-1899-444...|[162864005, 10509...|
|00091bb6-7352-43b...|[53741008, 105090...|
|000a949e-82d6-441...|[271737000, 44481...|
|000b05e4-c63c-40c...|[162864005, 27173...|
|000c905e-46d1-4d4...|[43878008, 368581...|
|000da8dd-2917-4bd...|[271737000, 59621...|
|000e6ebf-8ad3-430...|[15777000, 558220...|
|000eb281-9fa1-446...|[162864005, 72892...|
|00106d6a-f7b9-455...|[162864005, 36971...|
|0011b210-c80b-4ed...|[241929008

### 1.1.
In each iteration of frequent itemset mining, the algorithm traverses through the dataset to determine the number of baskets that contain a particular itemset. We start with single items (k = 1) in the first iteration and proceed to 2-itemsets (k = 2), 3-itemsets (k = 3), and so on.

The search space is pruned by removing itemsets that do not meet a minimum support threshold, and the most frequent items are carried forward from one iteration to the next, ideally until no more frequent itemsets can be found.
>For this assignment, we stop at k=2 and k=3.

We collect the most frequent items (l1), 2-itemsets (l2), and 3-itemsets (l3) - generically referred to as `lk` - in the form of an RDD to later extract the top 10 most frequent itemsets for K=2, K=3 [(exercise 1.2)](#1.2) or any user-defined K as `max_k`. Nevertheless, even if we call the function with `max_k=3`, we still need the most frequent items and 2-itemsets to generate association rules, so we store them as dictionaries `l1.collectAsMap()` and `l2.collectAsMap()` in the array `freq`.

Given the fact that the whole process is supposed to be executed in a parallelised manner, the dictionary `freq` is **broadcasted to all nodes** to avoid shuffling the data across the network, i.e, to eliminate the overhead of sending the data to the nodes where the tasks are executed. The dictionary could not be partitioned, because all nodes must possess all entries to perform lookup operations.

**Main methods**:
- `flatMap` generates new entries (itemset, 1) for each k-itemset in a given basket (the number of rows in the RDD is increased).
- `reduceByKey` counts the number of baskets that contain each itemset.
- `filter` removes itemsets that do not meet the minimum support threshold.
- `collectAsMap` stores the RDD entries in the form of a Python dictionary.

In [5]:
from itertools import combinations

def a_priori(baskets_rdd, min_support_threshold, max_k):
    """
        Compute frequent itemsets using the A-Priori algorithm.

        args:
            baskets_rdd: RDD of baskets (lists of items).
            min_support_threshold: minimum count of an itemset to be considered frequent.
            max_k: maximum size of the itemsets to compute.
    """

    # First pass: compute frequent itemsets of size 1
    # lk denotes the frequent itemsets of size k
    lk = baskets_rdd.flatMap(
        lambda basket: [(item, 1) for item in basket.CONDITIONS]
    ) \
    .reduceByKey(lambda a, b: a + b) \
    .filter(lambda entry: entry[1] >= min_support_threshold) \

    freq = [sc.broadcast(lk.collectAsMap())]

    k = sc.broadcast(2)

    while k.value <= max_k:
        print(f"Computing frequent itemsets of size {k.value}...")
        if k.value == 2: # k = 2
            build_candidates = lambda basket: [((item1, item2), 1) for item1 in basket.CONDITIONS for item2 in basket.CONDITIONS
            if item1 < item2 and item1 in freq[-1].value and item2 in freq[-1].value]
        
        else: # k > 2
            build_candidates = lambda basket: [
                (itemset, 1) for itemset in combinations(sorted(basket.CONDITIONS), k.value)
                if all(subset in freq[-1].value for subset in combinations(itemset, k.value - 1))
            ]

        lk = baskets_rdd.flatMap(build_candidates) \
        .reduceByKey(lambda a, b: a + b) \
        .filter(lambda entry: entry[1] >= min_support_threshold) \

        freq.append(sc.broadcast(lk.collectAsMap()))

        k = sc.broadcast(k.value + 1)

    return lk, freq[:-1]

### 1.2.
#### Most frequent 2-itemsets (K=2)

In [7]:
l2, _ = a_priori(baskets, MIN_SUPPORT_THRESHOLD, 2)
l2.takeOrdered(10, key=lambda entry: -entry[1])

                                                                                

Computing frequent itemsets of size 2...


                                                                                

[(('195662009', '444814009'), 343651),
 (('10509002', '444814009'), 302516),
 (('15777000', '271737000'), 289176),
 (('162864005', '444814009'), 243812),
 (('271737000', '444814009'), 236847),
 (('15777000', '444814009'), 236320),
 (('10509002', '195662009'), 211065),
 (('444814009', '59621000'), 203450),
 (('162864005', '195662009'), 167438),
 (('40055000', '444814009'), 165530)]

#### Most frequent 3-itemsets (K=3)

In [8]:
l3, freq = a_priori(baskets, MIN_SUPPORT_THRESHOLD, 3)
l3.takeOrdered(10, key=lambda entry: -entry[1])

                                                                                

Computing frequent itemsets of size 2...


                                                                                

Computing frequent itemsets of size 3...


                                                                                

[(('15777000', '271737000', '444814009'), 192819),
 (('10509002', '195662009', '444814009'), 139174),
 (('15777000', '195662009', '271737000'), 132583),
 (('10509002', '15777000', '271737000'), 115510),
 (('162864005', '195662009', '444814009'), 111860),
 (('195662009', '271737000', '444814009'), 108560),
 (('15777000', '195662009', '444814009'), 108083),
 (('15777000', '271737000', '59621000'), 99818),
 (('10509002', '162864005', '444814009'), 97384),
 (('10509002', '271737000', '444814009'), 94793)]

### 1.3.
Harnessing the broadcasted dictionary `freq` and the method `flatMap` once again, we can parallelise the generation of association rules. We were interested in rules of the forms (X) → Y and (X, Y) → Z with a minimum standardised lift of 0.2.

For the first ones, we look for the most frequent pairs (A, B) and check the standardised lift of A → B and B → A. For the second ones, we look for the most frequent triples (A, B, C) and check the standardised lift of (A, B) → C, (A, C) → B, and (B, C) → A (for each item j, I \ {j} → j). Furthermore, we also compute other metrics: lift, confidence, and interest.

Some metrics require the total number of baskets.

In [9]:
n_total = baskets.count()
n_total

                                                                                

1157578

In [129]:
def metrics (support1, support2, support_union):
    p1 = support1 / n_total
    p2 = support2 / n_total

    confidence = support_union / support1
    interest = confidence - p2
    lift = confidence / p2
    
    x = max(p1 + p2 - 1, 1/n_total) / (p1 * p2)
    standardised_lift = (lift - x) / ((1/max(p1, p2)) - x)

    return (standardised_lift, lift, interest, confidence)

#### Rules X → Y (standardised lift >= 0.2)

In [130]:
def build_1_to_1_rule(entry):
    (item1, item2), support_union = entry

    rules = []

    support1 = freq[0].value[item1]
    support2 = freq[0].value[item2]

    # evaluate item1 -> item2 (I = {item1}, j = {item2})
    metrics1 = metrics(support1, support2, support_union)

    if metrics1[0] >= 0.2: # standardised lift > 0.2
        rules.append((item1, item2, *metrics1))

    # evaluate item2 -> item1 (I = {item2}, j = {item1})
    metrics2 = metrics(support2, support1, support_union)
    if metrics2[0] >= 0.2: # standardised lift > 0.2
        rules.append((item2, item1, *metrics2))

    return rules

In [131]:
all_rules = l2.flatMap(build_1_to_1_rule).collect()

#### Rules (X, Y) → Z (standardised lift >= 0.2)

In [132]:
# Relations (X, Y) -> Z with a standardised lift > 0.2
def build_2_to_1_rule(entry):
    itemset, support_union = entry
    rules = []

    for j in itemset:
        I = tuple(item for item in itemset if item != j)

        support1 = freq[1].value[I]
        support2 = freq[0].value[j]

        metrics_ = metrics(support1, support2, support_union)

        if metrics_[0] >= 0.2: # standardised lift > 0.2
            rules.append((I, j, *metrics_))

    return rules

In [133]:
all_rules += l3.flatMap(build_2_to_1_rule).collect()

In [134]:
len(all_rules)

25665

### Exporting results with Pandas

Spark supports Pandas dataframes that can be imported as `pyspark.pandas.frame.DataFrame`. As this method requires PyArrow >= 4.0.0, the original Pandas library was used to export the results. 

In [142]:
import pandas as pd
# import pyspark.pandas.frame as pd

rules_df = pd.DataFrame(all_rules, columns=["Antecedent", "Consequent", "Standardised Lift", "Lift", "Interest", "Confidence"])
rules_df.sort_values("Standardised Lift", ascending=False, inplace=True)

rules_df.to_string(RESULTS_DIRECTORY_PATH + "association_rules.txt", index=False, float_format=lambda x: f"{x:.15f}")