# MLSD : Assignment 1

### By: Alexandra de Carvalho, nmec 93346

# Exercise 1

PySpark is imported, as an interface for Apache Spark in Python. A spark context is created, serving as the connection to a Spark cluster, further used to create and manipulate RDDs and broadcast variables on that cluster.

In [1]:
from pyspark import SparkContext
from itertools import combinations

In [2]:
sc = SparkContext(appName="Assignment1")

22/04/12 14:50:38 WARN Utils: Your hostname, jarvis resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlp4s0)
22/04/12 14:50:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/12 14:50:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Ex 1.1

### Import the Dataset 

In this step, the file containing the dataset is read and an RDD is created, with rows in the form of (patient, {all conditions registered for that patient})

In [3]:
data = sc.textFile('dataset/conditions.csv')                # reading the dataset
header = data.first()                                       # first line is the header
data = data.filter(lambda line: line != header)             # filter out the header 
data = data.map(lambda line: line.split(","))               # [date that marks the opening of that patient's process, date that marks the closing of that patient's process, patient code, encounter code, reported condition's number, reported condition's name]
data = data.map(lambda line: (line[2], {line[-2]}))         # (patient code, {the reported condition's number})
data = data.reduceByKey(lambda code1, code2: code1 | code2) # aggregating diferent entries for the same patient using set theory to join the patient's multiple conditions in the same set

                                                                                

In the first pass:

- The frequency of every condition is counted and a list of conditions whose frequency is above the threshold (1000) is collected, in order to filter out the infrequent data in the RDD created above. We then get a filtered version of the initial RDD, whose rows in the form of (patient, {conditions set}) present only the conditions with number of appearances above the given support threshold (1000).


In further passes:

- From each basket, the possible k-length combinations are created and counted. As previously, the combinations with support below the threshold are filtered out.

In [4]:
max_k = 3
support_threshold = 1000

for k in range(1,max_k + 1):  
    
    # first passing
    if k == 1:
        counts = data.flatMap(lambda patient_condition: patient_condition[1])                               # list of conditions 
        counts = counts.map(lambda condition: (condition,1))                                                # list of (condition,1)
        counts = counts.reduceByKey(lambda count1, count2: count1 + count2)                                 # list of (condition, number of times that condition appeared), by adding the 1 in every appearance tuple 

        frequent_items = counts.filter(lambda condition_count: condition_count[1]  >= support_threshold)    # filtering out conditions if they present support below threshold of 1000
        f_items = frequent_items.map(lambda condition_count: condition_count[0]).collect()                  # collecting list of condition codes whose frequency is above the threshold
        frequent_data = data.map(lambda patient_conditions: (patient_conditions[0], {code for code in patient_conditions[1] if code in f_items}))   # (patient, {reported condition's number if condition frequency is above threshold})

    # second and third (or more) passings
    else:
        candidate_combinations = frequent_data.map(lambda patient_conditions: (patient_conditions[0], {comb for comb in combinations(patient_conditions[1],k)}))    # (patient, {frequent condition combined into pairs (when k=2) or trios (when k=3) or...})
        frequents = candidate_combinations.flatMap(lambda patient_combinationset: patient_combinationset[1]) # getting a list of all possible combinations     
        frequents = frequents.map(lambda combination: (combination,1))                                       # (combination, 1)
        frequents = frequents.reduceByKey(lambda count1, count2: count1 + count2)                            # (combination, frequency), by adding the 1 in every appearance tuple
        frequents = frequents.filter(lambda combination_count: combination_count[1]  >= support_threshold)   # filtering out combinations if they present support below threshold of 1000
        
        # to store the data in variables with interpretable names - will be good for next exercise, not necessary for this exercise
        if k == 2:
            frequent_pairs = frequents
        elif k == 3:
            frequent_trios = frequents

        # print the top 10 combinations in descending order of frequency (the 10 most frequent condition pairs and trios)
        print(frequents.sortBy(lambda triplet: triplet[1], ascending=False).map(lambda triplet: triplet[0]).take(10))

    k += 1

                                                                                

[('15777000', '271737000'), ('444814009', '195662009'), ('444814009', '162864005'), ('10509002', '444814009'), ('15777000', '444814009'), ('271737000', '444814009'), ('59621000', '444814009'), ('10509002', '195662009'), ('40055000', '444814009'), ('271737000', '195662009')]




[('15777000', '271737000', '444814009'), ('15777000', '271737000', '195662009'), ('10509002', '444814009', '195662009'), ('15777000', '444814009', '195662009'), ('271737000', '444814009', '195662009'), ('444814009', '162864005', '195662009'), ('15777000', '10509002', '271737000'), ('10509002', '444814009', '162864005'), ('15777000', '10509002', '444814009'), ('59621000', '444814009', '195662009')]


                                                                                

## Ex 1.2

The following function retrieves the frequency of the item/combination and divides it by the total number of baskets, resulting in the probability of said item/combination.

In [5]:
total = data.count()
def get_probability(code):
    if type(code) is str:
        return [code_support[1] for code_support in individual_counts if code_support[0] == code][0]/total
    elif type(code) is tuple and len(code) == 2:
        return [code_support[1] for code_support in pair_counts if code_support[0] == code][0]/total

                                                                                

The following function computes a useful expression for the standardization of the lift metric: max {P(I)+P(j)-1 , 1/n} / P(I)*P(j)

In [6]:
def get_std_coef(i, j):
    return max(get_probability(i)+get_probability(j)-1, 1/total)/(get_probability(i)*get_probability(j))

From each trio, possible pairs are calculated. These are to be used in two ways: to compute the (X,Y) pair that originates the (X,Y) -> Z rule (pair_subsets RDD), and to create the X -> Y rules (subsets RDD). Each of the metrics is then computed over and appended to every rule. In the end, the file needs to be read again in order to create a {condition name: condition code} dictionary that will be used to change the rules before filtering (by std lift >= 0.2), sorting, and writing them to a file.  

In [9]:
pair_subsets = frequent_trios.flatMap(lambda trio_support: [(comb,set(trio_support[0]) - set(comb),trio_support[1]) for comb in combinations(trio_support[0],2)])   # (pair I, remaining element j, support of the trio)

# collecting the (code, support) list and the (pair, support) list
pair_counts = frequent_pairs.collect()
individual_counts = frequent_items.collect()

# from the previous pair-based rules (pair I, remaining element j, support of the trio), this next line also adds the individual-based rules X -> Y and Y -> X as the following: ((X,), {Y}, support(X and Y)) and ((Y,), {X}, support(X and Y)). This way we have all possible rules wanted 
subsets = pair_subsets.flatMap(lambda pair_j_tsupport: [pair_j_tsupport, ((pair_j_tsupport[0][0],), {pair_j_tsupport[0][1]}, [support[1] for support in pair_counts if support[0] == pair_j_tsupport[0]][0]), ((pair_j_tsupport[0][1],), {pair_j_tsupport[0][0]}, [support[1] for support in pair_counts if support[0] == pair_j_tsupport[0]][0])])

# changing the last element in each tuple, wich was the support of I and {j}, by dividing it by the support of I. Also changing the second element of the tuple from set {j} to tuple (j,)
confidence = subsets.map(lambda pair_set_count : (pair_set_count[0], tuple(pair_set_count[1]), pair_set_count[2] / [pair_support[1] for pair_support in pair_counts if pair_support[0] == pair_set_count[0]][0]) if len(pair_set_count[0]) == 2 else (pair_set_count[0], tuple(pair_set_count[1]), pair_set_count[2] / [pair_support[1] for pair_support in individual_counts if pair_support[0] == pair_set_count[0][0]][0])).distinct()

# adding an element to the tuple: the interest, calculated by subtracting the probability of j from the confidence 
interest = confidence.map(lambda i_j_confidence: i_j_confidence + (i_j_confidence[2] - get_probability(i_j_confidence[1][0]),))

# adding an element to the tuple: the lift, calculated by dividing the confidence by the probability of j
lift = interest.map(lambda i_j_conf_int: i_j_conf_int + (i_j_conf_int[2] / get_probability(i_j_conf_int[1][0]),))

# adding the last element to the tuple: the standardized lift, using the formula and the function defined above
std_lift = lift.map(lambda ijcil: ijcil + ((ijcil[4] - get_std_coef(ijcil[0], ijcil[1][0])) / ((1/max(get_probability(ijcil[0]),get_probability(ijcil[1][0]))) - get_std_coef(ijcil[0], ijcil[1][0])),) if len(ijcil[0]) == 2 else ijcil + ((ijcil[4] - get_std_coef(ijcil[0][0], ijcil[1][0])) / ((1/max(get_probability(ijcil[0][0]),get_probability(ijcil[1][0]))) - get_std_coef(ijcil[0][0], ijcil[1][0])),))

# reading the file again to collect a list of {condition code: condition name}
cond_names_list = sc.textFile('dataset/conditions.csv').filter(lambda line: line != header).map(lambda line: line.split(",")).map(lambda line: {line[-2]: line[-1]}).collect()
cond_names_dict = {list(item.keys())[0]:list(item.values())[0] for item in cond_names_list} # joining everything into the same dictionary

# changing the first element of the tuple above to be condition name -> condition name (and removing the second element of the tuple)
conf_int_lift_stdlift = std_lift.map(lambda name_metrics: (cond_names_dict[name_metrics[0][0]] + ' -> ' + cond_names_dict[name_metrics[1][0]],) + name_metrics[2:] if len(name_metrics[0]) == 1 else (cond_names_dict[name_metrics[0][0]] + ' and ' + cond_names_dict[name_metrics[0][1]] + ' -> ' + cond_names_dict[name_metrics[1][0]],) + name_metrics[2:])
conf_int_lift_stdlift.filter(lambda metrics: metrics[-1] >= 0.2).sortBy(lambda metrics: metrics[-1]).saveAsTextFile('results/') # filtered by std_lift >= 0.2 and sorted by lift, stored results

                                                                                