## Cristiano Nicolau - 108536

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_set, size, explode
from itertools import combinations
from collections import defaultdict

In [2]:
sp = SparkSession.builder.appName("AprioriConditions").getOrCreate()
df = sp.read.csv("./_input/conditions.csv.gz", header=True, inferSchema=True)
df.show(3)

25/04/30 16:27:07 WARN Utils: Your hostname, cristianonicolau.local resolves to a loopback address: 127.0.0.1; using 192.168.0.3 instead (on interface en0)
25/04/30 16:27:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/30 16:27:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 1:>                                                          (0 + 1) / 1]

+----------+----------+--------------------+--------------------+---------+--------------------+
|     START|      STOP|             PATIENT|           ENCOUNTER|     CODE|         DESCRIPTION|
+----------+----------+--------------------+--------------------+---------+--------------------+
|2017-01-14|2017-03-30|09e4e8cb-29c2-4ef...|88e540ab-a7d7-47d...| 65363002|        Otitis media|
|2012-09-15|2012-09-16|b0a03e8c-8d0f-424...|e89414dc-d0c6-478...|241929008|Acute allergic re...|
|2018-06-17|2018-06-24|09e4e8cb-29c2-4ef...|c14325b0-f7ec-431...|444814009|Viral sinusitis (...|
+----------+----------+--------------------+--------------------+---------+--------------------+
only showing top 3 rows



                                                                                

In [3]:
patient_condition_lists = df.groupBy("PATIENT").agg(collect_set("CODE").alias("conditions")).select("conditions").rdd.map(lambda row: row.conditions)
first_row = patient_condition_lists.first()
print("Primeiro elemento:")
print(first_row)


[Stage 3:>                                                          (0 + 1) / 1]

Primeiro elemento:
[703151001, 128613002, 70704007, 444814009, 65363002, 232353008, 192127007, 195662009]


                                                                                

In [4]:
transactions_list = patient_condition_lists.collect()
transaction_count = len(transactions_list)

print("Total de condições:")
print(transaction_count)

min_support = 1000
min_support_ratio = min_support / transaction_count

min_standardized_lift = 0.2


                                                                                

Total de condições:
1157578


In [5]:
item_counts = {}
for transaction in transactions_list:
    for item in transaction:
        item_counts[item] = item_counts.get(item, 0) + 1

frequent_1_itemsets = []
support_data = {}

# Compute support and filter by min_support
for item, count in item_counts.items():
    support = count / transaction_count
    if support >= min_support_ratio:
        itemset = (item,)
        frequent_1_itemsets.append(itemset)
        support_data[itemset] = support

frequent_items = frequent_1_itemsets
all_frequent_itemsets = frequent_1_itemsets[:]

print("Top 5 frequent 1-itemsets, support:")
for itemset in frequent_items[:5]:
    print(itemset, support_data[itemset])

print("Total frequent 1-itemsets:")
print(len(frequent_items))

Top 5 frequent 1-itemsets, support:
(703151001,) 0.03688131598907374
(128613002,) 0.03688131598907374
(70704007,) 0.04510020059123446
(444814009,) 0.6495804170431712
(65363002,) 0.11577448776669909
Total frequent 1-itemsets:
131


In [6]:
def apriori_gen(frequent_prev, k):
    candidates = set()
    for a, b in combinations(frequent_prev, 2):
        union = tuple(sorted(set(a).union(set(b))))
        if len(union) == k:
            candidates.add(union)
    return list(candidates)

In [7]:
top_k2, top_k3 = [], []

for k in [2, 3]:
    candidates = apriori_gen(frequent_items, k)
    candidate_rdd = sp.sparkContext.parallelize(candidates)

    def count_support(candidate):
        count = 0
        for t in transactions_list:
            if all(item in t for item in candidate):
                count += 1
        return (candidate, count)

    counted = candidate_rdd.map(count_support).filter(lambda x: x[1] >= min_support).collect()

    support_data.update({itemset: count / transaction_count for itemset, count in counted})
    frequent_k = [itemset for itemset, _ in counted]
    all_frequent_itemsets.extend(frequent_k)
    frequent_items = frequent_k

    if k == 2:
        top_k2 = sorted(counted, key=lambda x: x[1], reverse=True)[:10]
    elif k == 3:
        top_k3 = sorted(counted, key=lambda x: x[1], reverse=True)[:10]

                                                                                

In [14]:
print("Top 10 frequent itemsets for k=2:")
for itemset, count in top_k2:
    print(set(itemset), "support:", count)

Top 10 frequent itemsets for k=2:
{195662009, 444814009} support: 343651
{444814009, 10509002} support: 302516
{15777000, 271737000} support: 289176
{444814009, 162864005} support: 243812
{271737000, 444814009} support: 236847
{15777000, 444814009} support: 236320
{195662009, 10509002} support: 211065
{59621000, 444814009} support: 203450
{195662009, 162864005} support: 167438
{40055000, 444814009} support: 165530


In [15]:
print("\nTop 10 frequent itemsets for k=3:")
for itemset, count in top_k3:
    print(set(itemset), "support:", count)


Top 10 frequent itemsets for k=3:
{15777000, 444814009, 271737000} support: 192819
{195662009, 10509002, 444814009} support: 139174
{15777000, 195662009, 271737000} support: 132583
{15777000, 10509002, 271737000} support: 115510
{195662009, 444814009, 162864005} support: 111860
{271737000, 195662009, 444814009} support: 108560
{15777000, 195662009, 444814009} support: 108083
{15777000, 59621000, 271737000} support: 99818
{444814009, 10509002, 162864005} support: 97384
{271737000, 444814009, 10509002} support: 94793


In [10]:
def generate_rules(frequent_itemsets, support_data):
    rules = []
    for itemset in frequent_itemsets:
        if len(itemset) >= 2:
            for i in range(1, len(itemset)):
                for antecedent in combinations(itemset, i):
                    consequent = tuple(sorted(set(itemset) - set(antecedent)))
                    antecedent = tuple(sorted(antecedent))
                    if not consequent:
                        continue
                    support_itemset = support_data.get(tuple(sorted(itemset)))
                    support_antecedent = support_data.get(tuple(sorted(antecedent)))
                    support_consequent = support_data.get(tuple(sorted(consequent)))
                    if not (support_itemset and support_antecedent and support_consequent):
                        continue

                    confidence = support_itemset / support_antecedent
                    lift = confidence / support_consequent
                    interest = support_itemset - (support_antecedent * support_consequent)

                    if lift >= min_standardized_lift:
                        rules.append((antecedent, consequent, lift, confidence, interest))
    return sorted(rules, key=lambda x: -x[2])

rules = generate_rules(all_frequent_itemsets, support_data)

In [11]:
print("\nTop 5 regras com lift >= 0.2:")
for antecedent, consequent, lift, confidence, interest in rules[:5]:
    print(f"Antecedent: {set(antecedent)}, Consequent: {set(consequent)}, Lift: {lift:.4f}, Confidence: {confidence:.4f}, Interest: {interest:.4f}")
print("\nTotal de regras geradas:", len(rules))


Top 5 regras com lift >= 0.2:
Antecedent: {67811000119102}, Consequent: {254632001, 162864005}, Lift: 514.7079, Confidence: 0.5265, Interest: 0.0010
Antecedent: {67811000119102}, Consequent: {254632001, 195662009}, Lift: 514.7079, Confidence: 0.4638, Interest: 0.0009
Antecedent: {254632001, 162864005}, Consequent: {67811000119102}, Lift: 514.7079, Confidence: 1.0000, Interest: 0.0010
Antecedent: {254632001, 195662009}, Consequent: {67811000119102}, Lift: 514.7079, Confidence: 1.0000, Interest: 0.0009
Antecedent: {254632001}, Consequent: {67811000119102}, Lift: 514.4791, Confidence: 0.9996, Interest: 0.0019

Total de regras geradas: 86230


In [None]:
# save the frequent itemsets to a file , k2 and k3
output_path = "./_output/"
frequent_itemsets_k2 = [(set(itemset), count) for itemset, count in top_k2]
frequent_itemsets_k3 = [(set(itemset), count) for itemset, count in top_k3]
with open(f"{output_path}/frequent_itemsets_k2.txt", "w") as f:
    for itemset, count in frequent_itemsets_k2:
        f.write(f"{itemset}, support: {count}\n")
with open(f"{output_path}/frequent_itemsets_k3.txt", "w") as f:
    for itemset, count in frequent_itemsets_k3:
        f.write(f"{itemset}, support: {count}\n")

#save the rules to a file
with open(f"{output_path}/association_rules.txt", "w") as f:
    for ant, cons, lift, conf, interest in rules:
        f.write(f"Rule: {set(ant)} -> {set(cons)}, "
                f"Lift: {lift:.4f}, Confidence: {conf:.4f}, Interest: {interest:.4f}\n")


In [None]:
sp.stop()


# Explicação do Trabalho
O trabalho consiste na implementação de um algoritmo Apriori para a identificação de padrões frequentes e regras de associação em um conjunto de dados médicos. O objetivo é analisar as condições médicas associadas a pacientes e gerar regras que possam indicar relações entre essas condições.


## 1. **Leitura dos Dados**
Comecei por carregar os dados a partir do arquivo CSV (`conditions.csv`) utilizando o Spark. O DataFrame resultante contém informações como:
- `PATIENT`: Identificador do paciente.
- `CODE`: Código da condição médica.
- `DESCRIPTION`: Descrição da condição.

## 2. **Agrupamento de Condições por Paciente**
A seguir comecei por agrupar as condições médicas utilizando a função `collect_set`, que cria uma lista de transações (`patient_condition_lists`), onde cada transação representa as condições associadas a um paciente.

## 3. **Definição de Parâmetros**
Apos isso defeni os parâmetros:
- `min_support`: Suporte mínimo absoluto (1000).
- `min_support_ratio`: Suporte mínimo relativo, calculado como `min_support / transaction_count`.
- `min_standardized_lift`: Lift padrao mínimo para regras de associação (0.2).

## 4. **Cálculo de Itens Frequentes**
Os itens frequentes foram identificados com base no suporte mínimo:
- Contagem de ocorrências de cada item em todas as transações.
- Filtragem de itens cujo suporte relativo é maior ou igual ao `min_support_ratio`.
- Os itens frequentes de 1 elemento foram armazenados em `frequent_1_itemsets`.

## 5. **Geração de Candidatos (Apriori)**
Criei uma função `apriori_gen` para gerar candidatos de tamanho `k` a partir dos itens frequentes de tamanho `k-1`. A função combina pares de conjuntos frequentes para formar novos candidatos.


## 6. **Identificação de Itens Frequentes para k=2 e k=3**
Para `k=2` e `k=3`, os candidatos foram avaliados:
- Para cada candidato, foi calculado o suporte relativo.
- Apenas candidatos com suporte maior ou igual ao `min_support` foram considerados frequentes.
- Os itens frequentes foram armazenados em `all_frequent_itemsets`.

## 7. **Geração de Regras de Associação**
Criei uma função `generate_rules` para gerar as regras de associação a partir dos itens frequentes:
- Para cada itemset frequente com tamanho ≥ 2, foram geradas combinações de antecedentes e consequentes.
- Foram calculadas métricas como confiança, lift, interesse e lift padrao.
- Apenas regras com `standardized_lift` ≥ `min_standardized_lift` foram mantidas.

## 8. **Save dos Resultados**
Os resultados foram guardos em arquivos:
- Itens frequentes para `k=2` e `k=3` foram salvos em arquivos de texto.
- Regras de associação foram salvas em um arquivo separado, incluindo todas as métricas calculadas.