Trabalho realizado por: Bárbara Freixo, PG49169

Este código implementa o algoritmo Apriori para mineração de regras de associação. O objetivo é encontrar padrões frequentes num conjunto de transações, sendo que cada transação é uma lista de itens. O processo começa por gerar conjuntos de itens candidatos a partir dos itens únicos nas transações e, em seguida, conta-se a ocorrência de cada conjunto de itens nas transações. Conjuntos com ocorrência abaixo do suporte mínimo são descartados. Com base nos conjuntos frequentes, são gerados novos conjuntos candidatos, repetindo-se o processo até não haver mais conjuntos candidatos. Finalmente, as regras de associação são geradas com base na confiança mínima.

A classe TransactionDataset representa um conjunto de transações, sendo que cada transação é uma lista de itens. O método get_unique_items é usado para obter uma lista dos itens únicos presentes em todas as transações. O método get_frequent_items é usado para obter itens frequentes em ordem inversa de frequência.

A classe Apriori implementa o algoritmo Apriori para mineração de regras de associação. O método count_itemsets é usado para contar a frequência de cada conjunto de itens candidato nas transações. O método prune_itemsets descarta os conjuntos de itens que têm suporte abaixo do suporte mínimo. O método generate_candidate_itemsets gera novos conjuntos de itens candidatos a partir dos conjuntos frequentes. O método apriori_algorithm é o método principal que executa o algoritmo Apriori.

A função association_rules gera regras de associação a partir dos conjuntos frequentes, baseadas na confiança mínima.

In [4]:
import numpy as np
from collections import defaultdict, OrderedDict
from itertools import combinations
import unittest

# Classe para representar um conjunto de transações (cada transação é uma lista de itens)
class TransactionDataset:
    def __init__(self, transactions):
        self.transactions = transactions  # Lista de transações
        self.items = self.get_unique_items()  # Lista de itens únicos
        self.frequent_items = self.get_frequent_items()  # Frequent items in reverse frequency order

    # Método para obter itens únicos em todas as transações
    def get_unique_items(self):
        unique_items = set()
        for transaction in self.transactions:
            for item in transaction:
                unique_items.add(item)
        return sorted(list(unique_items))

    # Método para obter itens frequentes em ordem inversa de frequência
    def get_frequent_items(self):
        item_counts = defaultdict(int)
        for transaction in self.transactions:
            for item in transaction:
                item_counts[item] += 1
        return OrderedDict(sorted(item_counts.items(), key=lambda x: x[1], reverse=True))

# Classe para implementar o algoritmo Apriori
class Apriori:
    def __init__(self, transaction_dataset, min_support):
        self.transaction_dataset = transaction_dataset  # Objeto TransactionDataset
        self.min_support = min_support  # Suporte mínimo
        self.itemsets = self.apriori_algorithm()  # Conjuntos de itens frequentes

    # Método para contar a ocorrência de cada conjunto de itens candidato nas transações
    def count_itemsets(self, candidate_itemsets):
        counts = defaultdict(int)
        for transaction in self.transaction_dataset.transactions:
            for itemset in candidate_itemsets:
                if set(itemset).issubset(transaction):
                    counts[itemset] += 1
        return counts

    # Método para podar conjuntos de itens com suporte abaixo do mínimo
    def prune_itemsets(self, counts):
        pruned_itemsets = []
        for itemset, count in counts.items():
            if count / len(self.transaction_dataset.transactions) >= self.min_support:
                pruned_itemsets.append(itemset)
        return pruned_itemsets

    # Método para gerar novos conjuntos de itens candidatos a partir dos conjuntos frequentes
    def generate_candidate_itemsets(self, frequent_itemsets):
        candidates = set()
        for itemset1 in frequent_itemsets:
            for itemset2 in frequent_itemsets:
                union = tuple(sorted(set(itemset1).union(itemset2)))
                if len(union) == len(itemset1) + 1:
                    candidates.add(union)
        return candidates

    # Método principal para executar o algoritmo Apriori
    def apriori_algorithm(self):
        itemsets = []
        candidate_itemsets = [tuple([item]) for item in self.transaction_dataset.items]
        while candidate_itemsets:
            counts = self.count_itemsets(candidate_itemsets)
            frequent_itemsets = self.prune_itemsets(counts)
            itemsets.extend(frequent_itemsets)
            candidate_itemsets = self.generate_candidate_itemsets(frequent_itemsets)
        return itemsets

# Função para gerar regras de associação a partir dos conjuntos frequentes
def association_rules(apriori, min_confidence):
    rules = []
    for itemset in apriori.itemsets:
        if len(itemset) < 2:
            continue
        for i in range(1, len(itemset)):
            for c in combinations(itemset, i):
                antecedent = tuple(sorted(c))
                consequent = tuple(sorted(set(itemset).difference(antecedent)))
                antecedent_support = apriori.count_itemsets([antecedent])[antecedent] / len(apriori.transaction_dataset.transactions)
                itemset_support = apriori.count_itemsets([itemset])[itemset] / len(apriori.transaction_dataset.transactions)
                confidence = itemset_support / antecedent_support
                if confidence >= min_confidence:
                    rules.append((antecedent, consequent, confidence))
    return rules

# Testes e exemplos para o algoritmo implementado

## Exemplo de uso do código

Vamos supor que temos uma lista de transações que representam compras numa loja, onde cada transação é uma lista de itens comprados por um cliente.

In [5]:
transactions = [    ['leite', 'pão', 'manteiga'],
    ['leite', 'pão'],
    ['leite', 'pão', 'ovos'],
    ['leite', 'pão', 'manteiga', 'ovos'],
    ['leite', 'ovos'],
    ['pão', 'manteiga', 'ovos'],
    ['pão', 'manteiga']
]

Para executar o algoritmo Apriori, primeiro precisamos de criar um objeto TransactionDataset:

In [6]:
transaction_dataset = TransactionDataset(transactions)
print("Unique items:", transaction_dataset.items)
print("Frequent items in reverse frequency order:", transaction_dataset.frequent_items)

Unique items: ['leite', 'manteiga', 'ovos', 'pão']
Frequent items in reverse frequency order: OrderedDict([('pão', 6), ('leite', 5), ('manteiga', 4), ('ovos', 4)])


Em seguida, podemos criar um objeto Apriori com um suporte mínimo de 0,3:

In [7]:
apriori = Apriori(transaction_dataset, min_support=0.3)
print("Frequent itemsets:", apriori.itemsets)

Frequent itemsets: [('leite',), ('manteiga',), ('pão',), ('ovos',), ('manteiga', 'pão'), ('leite', 'pão'), ('leite', 'ovos'), ('ovos', 'pão')]


Finalmente, podemos gerar as regras de associação com uma confiança mínima de 0,5:

In [8]:
rules = association_rules(apriori, 0.5)
for antecedent, consequent, confidence in rules:
    print("Antecedente: ", antecedent)
    print("Consequente: ", consequent)
    print("Confiança: ", round(confidence,2))
    print("\n")

Antecedente:  ('manteiga',)
Consequente:  ('pão',)
Confiança:  1.0


Antecedente:  ('pão',)
Consequente:  ('manteiga',)
Confiança:  0.67


Antecedente:  ('leite',)
Consequente:  ('pão',)
Confiança:  0.8


Antecedente:  ('pão',)
Consequente:  ('leite',)
Confiança:  0.67


Antecedente:  ('leite',)
Consequente:  ('ovos',)
Confiança:  0.6


Antecedente:  ('ovos',)
Consequente:  ('leite',)
Confiança:  0.75


Antecedente:  ('ovos',)
Consequente:  ('pão',)
Confiança:  0.75


Antecedente:  ('pão',)
Consequente:  ('ovos',)
Confiança:  0.5




As regras de associação geradas representam padrões frequentes nas compras dos clientes, como por exemplo: se um cliente compra leite, ele também compra pão com uma confiança de 0.8.

## Testes "Unittest"

Foram realizados dois testes utilizando o unittest para testar a implementação do algoritmo Apriori. Temos então o primeiro teste:

In [24]:
import unittest

class TestTransactionDataset(unittest.TestCase):
    def test_get_unique_items(self):
        """
        Testa a função get_unique_items, que deve retornar uma lista de itens únicos encontrados nas transações.
        """
        transactions = [['a', 'b'], ['b', 'c'], ['a', 'c']]
        dataset = TransactionDataset(transactions)
        self.assertEqual(dataset.get_unique_items(), ['a', 'b', 'c'])
    def test_get_frequent_items(self):
        """
        Testa a função get_frequent_items, que deve retornar itens frequentes na ordem inversa de frequência.
        """
        transactions = [['a', 'b'], ['b', 'c'], ['a', 'c']]
        dataset = TransactionDataset(transactions)
        frequent_items = dataset.get_frequent_items()
        self.assertEqual(frequent_items, OrderedDict([('a', 2), ('b', 2), ('c', 2)]))

class TestApriori(unittest.TestCase):
    def setUp(self):
        transactions = [['a', 'b'], ['b', 'c'], ['a', 'c']]
        dataset = TransactionDataset(transactions)
        self.apriori = Apriori(dataset, 0.5)

    def test_count_itemsets(self):
        """
        Testa a função count_itemsets, que deve contar corretamente a ocorrência de cada itemset candidato nas transações.
        """
        candidates = [('a',), ('b',)]
        counts = self.apriori.count_itemsets(candidates)
        self.assertEqual(counts, {('a',): 2, ('b',): 2})

    def test_prune_itemsets(self):
        """
        Testa a função prune_itemsets, que deve remover itemsets infrequentes com base no suporte mínimo.
        """
        counts = {('a',): 2, ('b',): 2, ('c',): 1}
        pruned_itemsets = self.apriori.prune_itemsets(counts)
        self.assertEqual(pruned_itemsets, [('a',), ('b',)])

    def test_generate_candidate_itemsets(self):
        """
        Testa a função generate_candidate_itemsets, que deve gerar itemsets candidatos a partir de itemsets frequentes.
        """
        frequent_itemsets = [('a',), ('b',)]
        candidates = self.apriori.generate_candidate_itemsets(frequent_itemsets)
        self.assertEqual(candidates, {('a', 'b')})

    def test_apriori_algorithm(self):
        """
        Testa a função apriori_algorithm, que deve retornar todos os itemsets frequentes.
        """
        itemsets = self.apriori.apriori_algorithm()
        self.assertEqual(itemsets, [('a',), ('b',), ('c',)])

class TestAssociationRules(unittest.TestCase):
    def setUp(self):
        transactions = [['a', 'b'], ['b', 'c'], ['a', 'c']]
        dataset = TransactionDataset(transactions)
        self.apriori = Apriori(dataset, 0.5)

    def test_association_rules(self):
        """
        Testa a função association_rules, que deve gerar regras de associação com base nos itemsets frequentes e confiança mínima.
        """
        rules = association_rules(self.apriori, 0.6)
        self.assertEqual(rules, [])

In [25]:
def run_tests():
    """
    Executa todos os testes das classes de teste TestTransactionDataset, TestApriori e TestAssociationRules.
    Esta função cria uma suíte de teste para cada classe de teste, adiciona-as a uma lista e, em seguida, cria
    uma suíte maior que combina todas as suítes. Por fim, um corredor de teste (TextTestRunner) é usado para
    executar a suíte maior com uma verbosidade de 2 (resultados detalhados).
    """
    # Lista das classes de teste
    test_classes = [TestTransactionDataset, TestApriori, TestAssociationRules]

    # Carregador de teste do unittest
    loader = unittest.TestLoader()

    # Lista para armazenar as suítes de teste de cada classe de teste
    suites_list = []
    for test_class in test_classes:
        suite = loader.loadTestsFromTestCase(test_class)
        suites_list.append(suite)

    # Cria uma suíte de teste maior combinando todas as suítes na lista
    big_suite = unittest.TestSuite(suites_list)

    # Executa a suíte de teste maior usando um corredor de teste (TextTestRunner)
    runner = unittest.TextTestRunner(verbosity=2)
    runner.run(big_suite)

# Executa a função run_tests para iniciar os testes
run_tests()

test_get_frequent_items (__main__.TestTransactionDataset.test_get_frequent_items)
Testa a função get_frequent_items, que deve retornar itens frequentes na ordem inversa de frequência. ... ok
test_get_unique_items (__main__.TestTransactionDataset.test_get_unique_items)
Testa a função get_unique_items, que deve retornar uma lista de itens únicos encontrados nas transações. ... ok
test_apriori_algorithm (__main__.TestApriori.test_apriori_algorithm)
Testa a função apriori_algorithm, que deve retornar todos os itemsets frequentes. ... ok
test_count_itemsets (__main__.TestApriori.test_count_itemsets)
Testa a função count_itemsets, que deve contar corretamente a ocorrência de cada itemset candidato nas transações. ... ok
test_generate_candidate_itemsets (__main__.TestApriori.test_generate_candidate_itemsets)
Testa a função generate_candidate_itemsets, que deve gerar itemsets candidatos a partir de itemsets frequentes. ... ok
test_prune_itemsets (__main__.TestApriori.test_prune_itemsets)
Testa

Temos agora o segundo teste:

In [26]:
import random
import string

class TestAprioriRandom(unittest.TestCase):
    @staticmethod
    def generate_random_transactions(num_transactions, num_items, item_pool_size):
        """
        Gera uma lista de transações aleatórias, onde cada transação contém um conjunto de itens selecionados
        aleatoriamente a partir de um conjunto de itens gerados.
        
        :param num_transactions: número de transações a serem geradas
        :param num_items: número de itens em cada transação
        :param item_pool_size: número de itens únicos disponíveis no conjunto de itens
        :return: lista de transações
        """
        item_pool = [''.join(random.choices(string.ascii_lowercase, k=3)) for _ in range(item_pool_size)]
        transactions = []
        for _ in range(num_transactions):
            items = random.sample(item_pool, num_items)
            transactions.append(items)
        return transactions

    def test_random_transactions(self):
        """
        Testa o algoritmo Apriori num conjunto de transações gerados aleatoriamente. Este teste verifica se
        todos os itemsets frequentes encontrados têm suporte maior ou igual ao suporte mínimo e se todas as regras
        de associação geradas têm confiança maior ou igual à confiança mínima.
        """
        random.seed(42)
        transactions = self.generate_random_transactions(num_transactions=50, num_items=5, item_pool_size=20)
        dataset = TransactionDataset(transactions)
        apriori = Apriori(dataset, 0.2)
        itemsets = apriori.apriori_algorithm()

        for itemset in itemsets:
            support = apriori.count_itemsets([itemset])[itemset] / len(apriori.transaction_dataset.transactions)
            self.assertGreaterEqual(support, 0.2)

        rules = association_rules(apriori, 0.5)
        for antecedent, consequent, confidence in rules:
            self.assertGreaterEqual(confidence, 0.5)

In [27]:
def run_tests2():
    """
    Executa todos os testes das classes de teste TestTransactionDataset, TestApriori, TestAssociationRules
    e TestAprioriRandom. Esta função cria uma suíte de teste para cada classe de teste, adiciona-as a uma lista
    e, em seguida, cria uma suíte maior que combina todas as suítes. Por fim, um corredor de teste
    (TextTestRunner) é usado para executar a suíte maior com uma verbosidade de 2 (resultados detalhados).
    """
    # Lista das classes de teste
    test_classes = [
        TestTransactionDataset,
        TestApriori,
        TestAssociationRules,
        TestAprioriRandom,
    ]

    # Carregador de teste do unittest
    loader = unittest.TestLoader()

    # Lista para armazenar as suítes de teste de cada classe de teste
    suites_list = []
    for test_class in test_classes:
        suite = loader.loadTestsFromTestCase(test_class)
        suites_list.append(suite)

    # Cria uma suíte de teste maior combinando todas as suítes na lista
    big_suite = unittest.TestSuite(suites_list)

    # Executa a suíte de teste maior usando um corredor de teste (TextTestRunner)
    runner = unittest.TextTestRunner(verbosity=2)
    runner.run(big_suite)

# Executa a função run_tests2 para iniciar os testes
run_tests2()

test_get_frequent_items (__main__.TestTransactionDataset.test_get_frequent_items)
Testa a função get_frequent_items, que deve retornar itens frequentes na ordem inversa de frequência. ... ok
test_get_unique_items (__main__.TestTransactionDataset.test_get_unique_items)
Testa a função get_unique_items, que deve retornar uma lista de itens únicos encontrados nas transações. ... ok
test_apriori_algorithm (__main__.TestApriori.test_apriori_algorithm)
Testa a função apriori_algorithm, que deve retornar todos os itemsets frequentes. ... ok
test_count_itemsets (__main__.TestApriori.test_count_itemsets)
Testa a função count_itemsets, que deve contar corretamente a ocorrência de cada itemset candidato nas transações. ... ok
test_generate_candidate_itemsets (__main__.TestApriori.test_generate_candidate_itemsets)
Testa a função generate_candidate_itemsets, que deve gerar itemsets candidatos a partir de itemsets frequentes. ... ok
test_prune_itemsets (__main__.TestApriori.test_prune_itemsets)
Testa