In [1]:
import pyspark
from pyspark import SparkContext, StorageLevel
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkFiles
from datetime import datetime
from time import time

path = "../data/conditions_truncated.csv"

conf = SparkConf()
conf.getAll()

sc = SparkContext(appName="test")
    
spark = SparkSession(sc)
sc.setLogLevel("ERROR")

textfile = sc.textFile(path)

sc.addFile(path)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/17 21:37:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# build bigrams method
def build_pairs(basket, filtered_diseases):
    """
    Returns a list of bigrams that only contain diseases from the filtered list.
    
    Parameters:
    - basket: a tuple representing a basket of items
    - filtered_diseases: a list of diseases above the support treshold to include in the bigrams
    
    Returns:
    - a list of bigrams, where each bigram is a tuple of two diseases
    """
    basket_diseases = basket[1] # list of diseases in the current basket
    bigrams = []

    for i, disease_1 in enumerate(basket_diseases):
        # if the current disease is not in the list of filtered diseases, skip
        if disease_1 not in filtered_diseases:
            continue

        for disease_2 in basket_diseases[i+1:]:
        # if the current disease is not in the list of filtered diseases, skip
            if disease_2 not in filtered_diseases:
                continue

            # create a bigram from the two diseases and append it to the list of bigrams
            bigram = f"{disease_1},{disease_2}"
            bigrams.append((bigram, 1))
    
    return bigrams

In [3]:
# association rules methods
def get_support_value(diseases, diseases_support):
    disease = diseases.find(",")
    key = diseases[:disease]

    return diseases_support.filter(lambda x: x[0] == key).collect()

def get_prob_value(diseases, baskets):
    disease = diseases.split(",")[1]
    return len(baskets.filter(lambda x: disease in x[1]).collect())/baskets.count()

def get_std_lift(support_x, prob_y, lift, total_baskets):
    max_value = max(support_x + prob_y - 1, 1/total_baskets)
    constant = (support_x * prob_y)
    numerator = lift - max_value / constant
    denominator = 1 / constant - max_value / constant
    return numerator/denominator

In [6]:
SUPPORT = 1000
K = 2

before_baskets =  textfile.map(lambda line: line.split(",")) \
                            .map(lambda pair: (pair[2],[pair[4]])) \
                                .reduceByKey(lambda a,b: a + b)

baskets = before_baskets.mapValues(lambda x: sorted(set(x)))

first_support = baskets.flatMap(lambda basket: 
                                [(disease, 1) for disease in basket[1]]
                                ).reduceByKey(lambda a,b: a + b) \
                                    .filter(lambda line: line[1] > SUPPORT)

filtered_diseases = first_support.map(lambda line: line[0]).collect()

bigrams_support = baskets.flatMap(lambda basket: 
                                  build_pairs(
                                    basket, 
                                    filtered_diseases)
                                ).reduceByKey(lambda a,b: a + b) \
                                    .filter(lambda line: line[1] > SUPPORT)

#all_bigrams = bigrams_support.sortBy(lambda line: line[1], False).collect()

# top 10 bigrams
top10_bigrams = bigrams_support.sortBy(lambda line: line[1], False).take(10)

print("Top 10 bigrams:\n")
print(top10_bigrams)

rules = {key:[value] for key, value in bigrams_support.collect()}
list_rules = []

# get association rules (X) -> (Y)
for key in rules:
    # get support of X
    support_x = get_support_value(key, first_support)[0][1]

    # confidence: support(X U Y)/support(X)
    confidence = rules[key][0] /  support_x
    rules[key][0] = confidence

    # get probability of Y
    prob_y = get_prob_value(key, baskets)

    # interest: confidence - prob(Y)
    interest = confidence - prob_y
    rules[key].append(interest)

    # lift: confidence / prob(Y)
    lift = confidence / prob_y
    rules[key].append(lift)

    # standard lift
    std_lift = get_std_lift(support_x, prob_y, lift, baskets.count())
    rules[key].append(std_lift)

    list_rules.append([key] + rules[key])

association_rules = sc.parallelize(list_rules)
association_rules = association_rules.filter(lambda line: line[4] > 0.2).sortBy(lambda line: line[4])

#format_time = datetime.now().strftime("%Y-%m-%d-T%H:%M:%S")
#association_rules.saveAsTextFile("{0}/Association Rules {1}".format("../results", format_time))

printable5 = association_rules.collect()

print("\nAssociation rules:\n")

print(printable5)

                                                                                


Top 10 bigrams:

[('195662009,444814009', 6453), ('10509002,444814009', 5576), ('15777000,271737000', 5301), ('162864005,444814009', 4537), ('15777000,444814009', 4427), ('271737000,444814009', 4353), ('10509002,195662009', 3937), ('444814009,59621000', 3735), ('162864005,195662009', 3178), ('15777000,195662009', 3062)]

Association rules:

[['15777000,444814009', 0.6784674329501915, 0.02976992669265932, 1.0458918469787375, 0.3215453183903218], ['19169002,444814009', 0.6774885967265898, 0.02879109046905759, 1.044382921456195, 0.32253408602656874], ['271737000,444814009', 0.6687663235520049, 0.02006881729447274, 1.0309370964137874, 0.3312484735588609], ['162864005,444814009', 0.6637893196781273, 0.015091813420595068, 1.0232647933358998, 0.3362257556998383], ['40055000,444814009', 0.6573834196891192, 0.008685913431586978, 1.0133897746604543, 0.3426406970497924], ['44465007,444814009', 0.6567363272565585, 0.008038820999026308, 1.0123922489627004, 0.34331374754533195], ['195662009,4448140

In [4]:
# build trigrams method
def build_trios(basket, filtered_diseases, filtered_diseases2):
    """
    Returns a list of trigrams that contain diseases from the filtered list.
    
    Parameters:
    - basket: a tuple representing a basket of items
    - filtered_diseases: a list of diseases above the support treshold to include in the trigrams
    - filtered_diseases2: a list of bigrams to include in the trigrams
    
    Returns:
    - a list of trigrams, where each trigram is a tuple of three diseases
    """
    diseases = basket[1] # list of diseases in the current basket
    trigrams = []
    
    for i, desease1 in enumerate(diseases):
        if desease1 not in filtered_diseases:
            continue
        for j, desease2 in enumerate(diseases[i+1:], i+1):
            # if the current disease is not in the list of filtered diseases, skip
            if desease2 not in filtered_diseases:
                continue

            # create a bigram from the two diseases and check if it is in the 
            # list of filtered bigrams
            if f"{desease1},{desease2}" not in filtered_diseases2:
                continue

            for desease3 in diseases[j+1:]:
                # if the current disease is not in the list of filtered diseases, 
                # skip to the next iteration
                if desease3 not in filtered_diseases:
                    continue
                
                # check if the two bigrams that can be created from the trigram 
                # are in the list of filtered bigrams
                if f"{desease1},{desease3}" not in filtered_diseases2 \
                    or f"{desease2},{desease3}" not in filtered_diseases2:
                    continue

                # create a trigram from the three diseases and append it to the 
                # list of trigrams
                trigrams.append((f"{desease1},{desease2},{desease3}", 1))
    return trigrams

In [5]:
# association rules' methods
def get_support_value2(diseases, diseases_support):
    disease = diseases.rfind(",")
    key = diseases[:disease]

    return diseases_support.filter(lambda x: x[0] == key).collect()

def get_prob_value2(diseases, baskets):
    disease = diseases.split(",")[2]

    return len(baskets.filter(lambda x: disease in x[1]).collect()) / baskets.count()

def get_std_lift2(support_xy, prob_z, lift, total_baskets):
    max_value = max(support_xy + prob_z - 1, 1 / total_baskets)
    constant = (support_xy * prob_z)

    numerator = lift - (max_value / constant)
    denominator = (1 / constant) - (max_value / constant)

    return numerator / denominator

In [7]:
filtered_diseases2 = bigrams_support.map(lambda line: line[0]).collect()

trigrams_support = baskets.flatMap(lambda basket: 
                                   build_trios(
                                        basket, 
                                        filtered_diseases, 
                                        filtered_diseases2)
                                        ).reduceByKey(lambda a,b: a + b) \
                                            .filter(lambda line: line[1] > SUPPORT)

#all_trigrams = trigrams_support.sortBy(lambda line: line[1], False).collect()

top10_trigrams = trigrams_support.sortBy(lambda line: line[1], False).take(10)

print("Top 10 trigrams:\n")
print(top10_trigrams)

association_rules = {key:[value] for key, value in trigrams_support.collect()}
rules_list = []

# get association rules (X,Y) -> (Z)
for key in association_rules:
    # get support of X U Y
    support_xy = get_support_value2(key, bigrams_support)[0][1]

    # confidence: support(X U Y U Z)/support(X U Y)
    confidence = association_rules[key][0] /  support_xy
    association_rules[key][0] = confidence

    # get probability of Z
    prob_z = get_prob_value2(key, baskets)

    # interest: confidence - prob(Z)
    interest = confidence - prob_z
    association_rules[key].append(interest)

    # lift: confidence / prob(Z)
    lift = confidence / prob_z
    association_rules[key].append(lift)

    # standard lift
    std_lift = get_std_lift2(support_xy, prob_z, lift, baskets.count())
    association_rules[key].append(std_lift)

    rules_list.append([key] + association_rules[key])

association_rules = sc.parallelize(rules_list)
association_rules = association_rules.filter(lambda line: line[4] > 0.2).sortBy(lambda line: line[4])

#format_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
#association_rules.saveAsTextFile("{0}/Association Rules {1}".format("../results", format_time))

printable6 = association_rules.collect()

print("\nAssociation rules:\n")

print(printable6)

                                                                                

Top 10 bigrams:

[('15777000,271737000,444814009', 3572), ('10509002,195662009,444814009', 2564), ('15777000,195662009,271737000', 2479), ('162864005,195662009,444814009', 2110), ('10509002,15777000,271737000', 2072), ('15777000,195662009,444814009', 2071), ('195662009,271737000,444814009', 2024), ('15777000,271737000,59621000', 1837), ('10509002,162864005,444814009', 1741), ('10509002,15777000,444814009', 1734)]

Association rules:

[['15777000,162864005,271737000', 0.7661795407098121, 0.46447378378017457, 2.5394926119639707, 0.2336631182074108], ['15777000,19169002,444814009', 0.6866666666666666, 0.03796916040913445, 1.0585313802548528, 0.3133770695283595], ['15777000,40055000,444814009', 0.6828868234007655, 0.034189317143233255, 1.0527045607751422, 0.3171554239592988], ['10509002,15777000,444814009', 0.6810683424980362, 0.032370836240503964, 1.0499012805325212, 0.3189629665979704], ['15777000,195662009,444814009', 0.6763553233180928, 0.027657817060560586, 1.0426359232057545, 0.32367

In [None]:
sc.stop()