<a href="https://colab.research.google.com/github/Feranie/Hierarchical-Classification-Project/blob/main/IRH_Sele%C3%A7%C3%A3o_de_atributos_com_heur%C3%ADstica_RandomRestart-Hill_Climbing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd  # Importa a biblioteca pandas para manipulação de dados
import numpy as np  # Importa numpy para operações matemáticas e arrays
import random  # Importa random para geração de números aleatórios
import re  # Importa regex para manipulação de strings
from collections import defaultdict  # Importa defaultdict para dicionários com valores padrão
from typing import List, Set, Tuple  # Importa tipos para anotações de tipo
import time  # Importa time para medição de tempo
from concurrent.futures import ProcessPoolExecutor  # Importa executor para processamento paralelo
import multiprocessing as mp  # Importa multiprocessing para processamento paralelo

# --- Leitura de arquivos ARFF ---
def read_arff_file(file_path):
    """Lê um arquivo ARFF e retorna um DataFrame pandas"""
    data = []  # Lista para armazenar os dados
    attributes = []  # Lista para armazenar os nomes dos atributos
    current_section = None  # Variável para rastrear a seção atual do arquivo

    with open(file_path, 'r', encoding='utf-8') as file:  # Abre o arquivo em modo leitura com codificação UTF-8
        for line in file:  # Itera sobre cada linha do arquivo
            line = line.strip()  # Remove espaços em branco no início e fim da linha
            if not line or line.startswith('%'):  # Ignora linhas vazias e comentários (que começam com %)
                continue

            if '@relation' in line.lower():  # Verifica se a linha contém a declaração de relação
                current_section = 'relation'  # Define a seção atual como relação
            elif '@attribute' in line.lower():  # Verifica se a linha contém uma declaração de atributo
                current_section = 'attribute'  # Define a seção atual como atributo
                match = re.match(r'@attribute\s+([^\s]+)\s+.*', line, re.IGNORECASE)  # Extrai o nome do atributo usando regex
                if match:  # Se encontrou uma correspondência
                    attributes.append(match.group(1))  # Adiciona o nome do atributo à lista
            elif '@data' in line.lower():  # Verifica se chegou na seção de dados
                current_section = 'data'  # Define a seção atual como dados
            elif current_section == 'data':  # Se estamos na seção de dados
                values = re.findall(r'[^,]+(?:,(?=[^,]*$))?', line)  # Divide a linha em valores usando regex
                values = [v.strip('" ') for v in values]  # Remove aspas e espaços dos valores
                if len(values) == len(attributes):  # Verifica se o número de valores corresponde ao número de atributos
                    data.append(values)  # Adiciona os valores à lista de dados

    return pd.DataFrame(data, columns=attributes)  # Retorna um DataFrame pandas com os dados e nomes das colunas

# --- Salvamento em arquivo ARFF ---
def save_to_arff(df, file_path, relation_name="filtered_data"):
    """Salva um DataFrame pandas em formato ARFF"""
    with open(file_path, 'w', encoding='utf-8') as f:  # Abre o arquivo em modo escrita com codificação UTF-8
        f.write(f"@relation {relation_name}\n\n")  # Escreve a declaração de relação
        for column in df.columns:  # Para cada coluna do DataFrame
            unique_values = df[column].unique()  # Obtém os valores únicos da coluna
            if all(isinstance(val, str) for val in unique_values):  # Se todos os valores são strings
                vals = ','.join(sorted(set(unique_values)))  # Cria uma string com valores únicos ordenados
                f.write(f"@attribute {column} {{{vals}}}\n")  # Escreve a declaração do atributo categórico
            else:  # Se os valores não são todos strings
                f.write(f"@attribute {column} numeric\n")  # Escreve a declaração do atributo numérico
        f.write("\n@data\n")  # Escreve a declaração de início dos dados
        for _, row in df.iterrows():  # Para cada linha do DataFrame
            f.write(",".join(map(str, row)) + "\n")  # Escreve a linha no arquivo separada por vírgulas

# --- Calculadora IRH Corrigida ---
class IRHCalculator:
    """Classe para calcular a Taxa de Inconsistência Hierárquica (IRH)"""
    def __init__(self, data: pd.DataFrame, weight_mode='equal'):
        """Inicializa a calculadora IRH com os dados e modo de pesos"""
        self.data = data  # Armazena os dados
        self.weight_mode = weight_mode  # Armazena o modo de cálculo dos pesos
        self.h = self._get_number_of_levels()  # Calcula o número de níveis hierárquicos
        self.weights = self._calculate_weights()  # Calcula os pesos para cada nível
        # Cache para os padrões já calculados para otimização
        self._pattern_cache = {}  # Dicionário para cache de padrões calculados
        # Pré-cálculo dos padrões de classe por nível
        self._precompute_class_patterns()  # Pré-computa os padrões de classe

    def _get_number_of_levels(self) -> int:
        """Calcula o número de níveis hierárquicos"""
        max_levels = 0  # Inicializa o número máximo de níveis
        for class_val in self.data['class']:  # Para cada valor de classe nos dados
            class_parts = str(class_val).split('.')  # Divide a classe por pontos
            if class_parts[0] == 'R':  # Se começa com 'R', ignora a raiz R
                levels = len(class_parts) - 1  # Conta os níveis menos 1 (para ignorar R)
                max_levels = max(max_levels, levels)  # Atualiza o máximo de níveis
            else:  # Se não tem R
                # Se não tem R, conta todos os segmentos
                max_levels = max(max_levels, len(class_parts))  # Atualiza o máximo de níveis
        return max_levels  # Retorna o número máximo de níveis encontrado

    def _precompute_class_patterns(self):
        """Pré-calcula os padrões de classe para cada nível, ignorando R."""
        self.class_patterns_by_level = {}  # Dicionário para armazenar padrões por nível

        for level in range(1, self.h + 1):  # Para cada nível de 1 até h
            class_patterns = []  # Lista para armazenar os padrões deste nível
            for class_val in self.data['class']:  # Para cada valor de classe
                class_parts = str(class_val).split('.')  # Divide a classe por pontos

                # Ignora a raiz R se ela existir
                if class_parts[0] == 'R' and len(class_parts) > 1:  # Se começa com R e tem mais partes
                    class_parts = class_parts[1:]  # Remove o R

                if len(class_parts) >= level:  # Se tem partes suficientes para este nível
                    # Pega os primeiros 'level' segmentos (sem R)
                    class_patterns.append('.'.join(class_parts[:level]))  # Adiciona o padrão truncado
                else:  # Se não tem partes suficientes
                    # Se não tem níveis suficientes, pega tudo o que tem
                    class_patterns.append('.'.join(class_parts))  # Adiciona todas as partes disponíveis

            self.class_patterns_by_level[level] = class_patterns  # Armazena os padrões para este nível

    def _calculate_weights(self):
        """Calcula os pesos uma única vez"""
        if self.weight_mode == 'equal':  # Se o modo é igual
            return [1/self.h for _ in range(self.h)]  # Retorna pesos iguais para todos os níveis
        return None  # Retorna None para outros modos (serão calculados dinamicamente)

    def _calculate_level_inconsistency_rate_optimized(self, attribute_subset: List[str], level: int) -> float:
        """Versão otimizada do cálculo de IRH para um nível"""
        cache_key = (tuple(sorted(attribute_subset)), level)  # Cria chave para o cache
        if cache_key in self._pattern_cache:  # Se já foi calculado antes
            return self._pattern_cache[cache_key]  # Retorna o valor do cache

        # Utilização de numpy para cálculos mais rápidos
        subset_data = self.data[attribute_subset].values  # Obtém os dados do subconjunto de atributos
        class_labels = self.class_patterns_by_level[level]  # Obtém os rótulos de classe para este nível

        pattern_counts = defaultdict(lambda: defaultdict(int))  # Dicionário aninhado para contar padrões

        for i, pattern in enumerate(subset_data):  # Para cada padrão nos dados
            pattern_tuple = tuple(pattern)  # Converte o padrão em tupla (hashável)
            pattern_counts[pattern_tuple][class_labels[i]] += 1  # Incrementa a contagem para este padrão e classe

        inconsistency_count = 0  # Inicializa o contador de inconsistências
        for class_counts in pattern_counts.values():  # Para cada padrão único
            total_pattern_count = sum(class_counts.values())  # Conta total de instâncias deste padrão
            max_class_count = max(class_counts.values())  # Conta da classe mais frequente para este padrão
            inconsistency_count += total_pattern_count - max_class_count  # Adiciona as inconsistências

        result = inconsistency_count / len(self.data) if len(self.data) > 0 else float('inf')  # Calcula a taxa de inconsistência
        self._pattern_cache[cache_key] = result  # Armazena no cache
        return result  # Retorna o resultado

    def calculate_irh(self, attribute_subset: List[str]) -> float:
        """Calcula o IRH para um subconjunto de atributos"""
        if not attribute_subset:  # Se o subconjunto está vazio
            return float('inf')  # Retorna infinito

        ir_levels = [  # Lista de taxas de inconsistência para cada nível
            self._calculate_level_inconsistency_rate_optimized(attribute_subset, level)  # Calcula para cada nível
            for level in range(1, self.h + 1)  # Para todos os níveis de 1 até h
        ]

        if self.weight_mode == 'equal':  # Se o modo é igual
            weights = self.weights  # Usa os pesos pré-calculados
        elif self.weight_mode == 'softmax':  # Se o modo é softmax
            weights = self._softmax(ir_levels, T=2.0)  # Calcula pesos usando softmax
        elif self.weight_mode == 'hybrid':  # Se o modo é híbrido
            equal = [1/self.h for _ in range(self.h)]  # Pesos iguais
            soft = self._softmax(ir_levels, T=2.0)  # Pesos softmax
            alpha = 0.5  # Parâmetro de mistura
            weights = [(1 - alpha) * e + alpha * s for e, s in zip(equal, soft)]  # Combina os dois tipos de pesos
        else:  # Se o modo é inválido
            raise ValueError("weight_mode deve ser um de 'equal', 'softmax', 'hybrid'")  # Lança erro

        return sum(w * ir for w, ir in zip(weights, ir_levels))  # Retorna a soma ponderada das taxas de inconsistência

    def _softmax(self, x: List[float], T: float = 2.0) -> List[float]:
        """Calcula a função softmax para uma lista de valores"""
        x = np.array(x)  # Converte para array numpy
        e_x = np.exp(-x / T)  # Calcula exponencial negativa dividida pela temperatura
        return (e_x / e_x.sum()).tolist()  # Normaliza e retorna como lista

    def debug_hierarchy(self):
        """Função de debug para verificar a hierarquia"""
        print(f"Número de níveis detectados: {self.h}")  # Imprime o número de níveis
        print("\nExemplos de classes e suas projeções:")  # Imprime cabeçalho

        sample_classes = self.data['class'].head(5).tolist()  # Obtém 5 exemplos de classes
        for class_val in sample_classes:  # Para cada classe de exemplo
            print(f"\nClasse original: {class_val}")  # Imprime a classe original
            class_parts = str(class_val).split('.')  # Divide a classe por pontos

            if class_parts[0] == 'R' and len(class_parts) > 1:  # Se começa com R e tem mais partes
                class_parts = class_parts[1:]  # Remove o R
                print(f"Após remoção de R: {class_parts}")  # Imprime após remoção

            for level in range(1, min(self.h + 1, len(class_parts) + 1)):  # Para cada nível possível
                projection = '.'.join(class_parts[:level])  # Cria a projeção para este nível
                print(f"  Nível {level}: {projection}")  # Imprime a projeção

# --- Random Restart Hill Climbing (inalterado) ---
class RandomRestartHillClimbingFeatureSelector:
    """Classe para seleção de características usando Random Restart Hill Climbing"""
    def __init__(self, data, weight_mode='equal', max_iterations=100, restarts=10,
                 early_stopping_patience=20, min_improvement=1e-6):
        """Inicializa o seletor de características"""
        self.data = data  # Armazena os dados
        self.features = [col for col in data.columns if col != 'class']  # Lista de características (exceto 'class')
        self.irh_calculator = IRHCalculator(data, weight_mode)  # Cria a calculadora IRH
        self.max_iterations = max_iterations  # Número máximo de iterações por restart
        self.restarts = restarts  # Número de restarts
        self.early_stopping_patience = early_stopping_patience  # Paciência para parada antecipada
        self.min_improvement = min_improvement  # Melhoria mínima necessária
        random.seed(42)  # Define semente aleatória para reprodutibilidade

    def fitness(self, feature_subset):
        """Calcula a adequação (fitness) de um subconjunto de características"""
        return self.irh_calculator.calculate_irh(feature_subset)  # Retorna o IRH calculado

    def get_neighbors_incremental(self, current_subset: List[str]) -> List[Tuple[List[str], str]]:
        """Gera os vizinhos com informação sobre o tipo de modificação"""
        neighbors = []  # Lista para armazenar os vizinhos
        current_set = set(current_subset)  # Converte para conjunto para busca rápida

        # Adicionar um atributo (prioridade aos acréscimos pois são frequentemente mais promissores)
        for f in self.features:  # Para cada característica disponível
            if f not in current_set:  # Se não está no subconjunto atual
                neighbors.append((current_subset + [f], 'add'))  # Adiciona vizinho que inclui esta característica

        # Remover um atributo
        if len(current_subset) > 1:  # Se há mais de uma característica (não pode ficar vazio)
            for f in current_subset:  # Para cada característica no subconjunto atual
                neighbors.append(([x for x in current_subset if x != f], 'remove'))  # Adiciona vizinho que remove esta característica

        return neighbors  # Retorna a lista de vizinhos

    def hill_climb_optimized(self, initial_subset):
        """Versão otimizada do hill climbing"""
        current_subset = initial_subset  # Define o subconjunto atual
        current_score = self.fitness(current_subset)  # Calcula o score inicial
        no_improvement_count = 0  # Contador de iterações sem melhoria

        print(f"  Início do hill climb com {len(initial_subset)} atributos, IRH inicial: {current_score:.6f}")  # Log do início

        for iteration in range(self.max_iterations):  # Para cada iteração até o máximo
            neighbors = self.get_neighbors_incremental(current_subset)  # Obtém os vizinhos
            best_neighbor = current_subset  # Inicializa o melhor vizinho
            best_score = current_score  # Inicializa o melhor score
            best_action = None  # Inicializa a melhor ação

            # Avaliação otimizada dos vizinhos
            improvements = []  # Lista para armazenar melhorias
            for neighbor, action in neighbors:  # Para cada vizinho e sua ação
                score = self.fitness(neighbor)  # Calcula o score do vizinho
                if score < current_score - self.min_improvement:  # Se há melhoria significativa
                    improvements.append((neighbor, score, action))  # Adiciona à lista de melhorias

            if improvements:  # Se há melhorias disponíveis
                # Pega a melhor melhoria
                best_neighbor, best_score, best_action = min(improvements, key=lambda x: x[1])  # Seleciona a melhor
                current_subset = best_neighbor  # Atualiza o subconjunto atual
                current_score = best_score  # Atualiza o score atual
                no_improvement_count = 0  # Reseta o contador de não-melhoria

                if iteration % 10 == 0 or best_action:  # A cada 10 iterações ou quando há ação
                    print(f"    Iteração {iteration}: {best_action} -> IRH={current_score:.6f}, tamanho={len(current_subset)}")  # Log do progresso
            else:  # Se não há melhorias
                no_improvement_count += 1  # Incrementa contador de não-melhoria
                if no_improvement_count >= self.early_stopping_patience:  # Se excedeu a paciência
                    print(f"    Parada antecipada após {iteration} iterações (sem melhoria)")  # Log da parada
                    break  # Sai do loop

        return current_subset, current_score  # Retorna o melhor subconjunto e seu score

    def run(self):
        """Versão sequencial otimizada"""
        print(f"Iniciando RRHC otimizado com {self.restarts} restarts...")  # Log do início

        best_global_subset = None  # Melhor subconjunto global
        best_global_score = float('inf')  # Melhor score global

        for restart in range(self.restarts):  # Para cada restart
            print(f"\n=== Restart {restart + 1}/{self.restarts} ===")  # Log do restart
            start_time = time.time()  # Marca o tempo de início

            # Geração de um ponto de partida mais inteligente
            initial_size = random.randint(max(1, len(self.features) // 10),  # Tamanho mínimo
                                        min(len(self.features), len(self.features) // 4))  # Tamanho máximo
            initial_feature = random.sample(self.features, initial_size)  # Seleciona características aleatórias

            subset, score = self.hill_climb_optimized(initial_feature)  # Executa hill climbing
            elapsed = time.time() - start_time  # Calcula tempo decorrido

            print(f"Restart {restart + 1} finalizado em {elapsed:.2f}s — IRH: {score:.6f}, tamanho: {len(subset)}")  # Log do resultado

            if score < best_global_score:  # Se é o melhor score até agora
                best_global_subset = subset  # Atualiza o melhor subconjunto
                best_global_score = score  # Atualiza o melhor score
                print(f"*** NOVO MELHOR RESULTADO: {best_global_score:.6f} ***")  # Log da melhoria

        return best_global_subset, best_global_score  # Retorna o melhor resultado global

# --- Função principal corrigida ---
def main():
    """Função principal do programa"""
    input_file_path = '/content/GPCR-PfamTRA0.arff'  # Caminho do arquivo de entrada (adaptar conforme necessário)
    output_file_path = '/content/GPCR-PfamTRA0OptimiserRRHC.arff'  # Caminho do arquivo de saída

    print("Carregando dados...")  # Log de carregamento
    start_time = time.time()  # Marca o tempo de início
    data = read_arff_file(input_file_path)  # Carrega os dados do arquivo ARFF
    load_time = time.time() - start_time  # Calcula tempo de carregamento

    print(f"Carregados {len(data)} instâncias com {len(data.columns)-1} atributos em {load_time:.2f}s")  # Log do carregamento

    # Criação da instância e debug da hierarquia
    print("\n=== ANÁLISE DA HIERARQUIA ===")  # Cabeçalho da análise
    irh_calc = IRHCalculator(data)  # Cria calculadora IRH
    irh_calc.debug_hierarchy()  # Executa debug da hierarquia

    print(f"\nNúmero de níveis hierárquicos (sem R): {irh_calc.h}")  # Imprime número de níveis
    print("Alguns exemplos de classes:")  # Cabeçalho dos exemplos
    for i, class_val in enumerate(data['class'].head(3)):  # Para os primeiros 3 valores de classe
        print(f"  {i+1}. {class_val}")  # Imprime cada exemplo

    # Teste rápido de IRH em todos os atributos
    all_features = [col for col in data.columns if col != 'class']  # Lista de todas as características
    print(f"\nTeste IRH em todos os {len(all_features)} atributos...")  # Log do teste
    irh_all = irh_calc.calculate_irh(all_features)  # Calcula IRH para todas as características
    print(f"IRH (todos atributos): {irh_all:.6f}")  # Imprime o resultado

    # Seleção de características otimizada
    print("\n=== SELEÇÃO DE ATRIBUTOS ===")  # Cabeçalho da seleção
    selector = RandomRestartHillClimbingFeatureSelector(  # Cria o seletor
        data,  # Dados
        weight_mode='hybrid',        # Modo de pesos: 'equal', 'softmax', ou 'hybrid'
        max_iterations=100,          # Máximo de iterações por restart
        restarts=10,                 # Número de restarts
        early_stopping_patience=15,  # Paciência para parada antecipada
        min_improvement=1e-6         # Melhoria mínima necessária
    )

    start_time = time.time()  # Marca o tempo de início
    best_subset, best_irh = selector.run()  # Executa a seleção
    total_time = time.time() - start_time  # Calcula tempo total

    print(f"\n=== RESULTADOS FINAIS ===")  # Cabeçalho dos resultados
    print(f"Tempo total: {total_time:.2f}s")  # Imprime tempo total
    print(f"Melhores atributos ({len(best_subset)}): {sorted(best_subset)}")  # Imprime os melhores atributos
    print(f"IRH obtido: {best_irh:.6f}")  # Imprime o IRH obtido
    print(f"Redução de IRH: {((irh_all - best_irh) / irh_all * 100):.2f}%")  # Imprime a redução percentual

    # Salvamento
    selected_data = data[best_subset + ['class']]  # Seleciona os dados com os melhores atributos
    save_to_arff(selected_data, output_file_path, relation_name="Corrected_IRH_Data")  # Salva no arquivo ARFF
    print(f"Resultados salvos em: {output_file_path}")  # Log do salvamento

if __name__ == "__main__":  # Se este arquivo está sendo executado diretamente
    main()  # Executa a função principal

Carregando dados...
Carregados 5871 instâncias com 75 atributos em 0.17s

=== ANÁLISE DA HIERARQUIA ===
Número de níveis detectados: 4

Exemplos de classes e suas projeções:

Classe original: R.001.001.001.003
Após remoção de R: ['001', '001', '001', '003']
  Nível 1: 001
  Nível 2: 001.001
  Nível 3: 001.001.001
  Nível 4: 001.001.001.003

Classe original: R.001.001.001.003
Após remoção de R: ['001', '001', '001', '003']
  Nível 1: 001
  Nível 2: 001.001
  Nível 3: 001.001.001
  Nível 4: 001.001.001.003

Classe original: R.001.001.001.003
Após remoção de R: ['001', '001', '001', '003']
  Nível 1: 001
  Nível 2: 001.001
  Nível 3: 001.001.001
  Nível 4: 001.001.001.003

Classe original: R.001.001.001.003
Após remoção de R: ['001', '001', '001', '003']
  Nível 1: 001
  Nível 2: 001.001
  Nível 3: 001.001.001
  Nível 4: 001.001.001.003

Classe original: R.001.001.001.003
Após remoção de R: ['001', '001', '001', '003']
  Nível 1: 001
  Nível 2: 001.001
  Nível 3: 001.001.001
  Nível 4: 00