In [12]:
import numpy as np
import random
import pandas as pd
import psutil
import os
import time
import glob
from typing import List, Tuple, Union, Optional
import csv

In [14]:
def calcular_aptidao_qap(
    solucao: Union[List[int], np.ndarray, List[List[int]]],
    matriz_fluxo: np.ndarray,
    matriz_distancia: np.ndarray
) -> int:
    solucao_flat: List[int] = []
    for x in solucao:
        if isinstance(x, (list, np.ndarray)):
            solucao_flat.extend(int(v) for v in x)
        else:
            solucao_flat.append(int(x))

    n: int = len(solucao_flat)
    custo: int = 0
    for i in range(n):
        for j in range(n):
            custo += matriz_fluxo[i][j] * matriz_distancia[solucao_flat[i]][solucao_flat[j]]
    return custo

In [15]:
def inicializar_feromonio(n: int, valor_inicial: float = 0.1) -> np.ndarray:
    return np.full((n, n), valor_inicial)

In [16]:
def construir_solucao_qap(
    feromonio: np.ndarray,
    alpha: float,
    beta: float,
    heuristica: np.ndarray
) -> List[int]:
    """
    feromonio -> memória coletiva das soluções anteriores 
    heuristica -> informação local que guia a escolha 
    alpha -> peso do feromônio
    beta -> peso da heurística
    """
    n: int = feromonio.shape[0]
    nao_visitados: List[int] = list(range(n))
    solucao: List[int] = []

    while nao_visitados:
        if not solucao:
            atual: int = random.choice(nao_visitados)
        else:
            ultima: int = solucao[-1]
            pesos: List[float] = []
            for prox in nao_visitados:
                tau: float = feromonio[ultima][prox] ** alpha
                eta: float = heuristica[ultima][prox] ** beta
                pesos.append(tau * eta)
            pesos_np: np.ndarray = np.array(pesos, dtype=float)
            if not np.isfinite(pesos_np).all() or pesos_np.sum() <= 0:
                pesos_np = np.ones_like(pesos_np) / len(pesos_np)
            else:
                pesos_np /= pesos_np.sum()
            atual = random.choices(nao_visitados, weights=pesos_np)[0]


        solucao.append(atual)
        nao_visitados.remove(atual)

    return solucao

In [17]:
def busca_local(
    solucao: List[int],
    fluxo: np.ndarray,
    distancia: np.ndarray,
    tempo_inicio_global: float = None,  
    tempo_global_max: float = None     
) -> List[int]:
    melhor: List[int] = solucao.copy()
    melhor_custo: int = calcular_aptidao_qap(melhor, fluxo, distancia)
    melhorou: bool = True
    
    while melhorou:
        if tempo_inicio_global and tempo_global_max:
            if (time.time() - tempo_inicio_global) > tempo_global_max:
                return melhor 
        
        melhorou = False
        for i in range(len(solucao) - 1):
            if tempo_inicio_global and tempo_global_max:
                if (time.time() - tempo_inicio_global) > tempo_global_max:
                    return melhor
            
            for j in range(i + 1, len(solucao)):
                nova: List[int] = melhor.copy()
                nova[i], nova[j] = nova[j], nova[i]
                custo_novo: int = calcular_aptidao_qap(nova, fluxo, distancia)
                if custo_novo < melhor_custo:
                    melhor, melhor_custo = nova, custo_novo
                    melhorou = True
    return melhor

In [18]:
def atualizar_feromonio(
    feromonio: np.ndarray,
    solucoes: List[List[int]],
    custos: List[int],
    rho: float,
    Q: float
) -> np.ndarray:
    feromonio *= (1 - rho)  
    for solucao, custo in zip(solucoes, custos):
        if custo <= 0 or not np.isfinite(custo):
            custo = 1e-6 
        deposito: float = Q / custo

        for i in range(len(solucao) - 1):
            feromonio[solucao[i]][solucao[i + 1]] += deposito
        feromonio[solucao[-1]][solucao[0]] += deposito
    return feromonio

In [19]:
def save_aco_iteration_results(
    file_name: str,
    instancia: str,
    iteration: int,
    ant_index: int,
    best_cost: float,
    current_cost: float,
    tempo_inicio_global: float
) -> None:
    """
    Salva resultados parciais da execução do ACO em um arquivo CSV no mesmo diretório.
    """
    path = file_name  

    header = [
        "instancia",
        "iteracao",
        "formiga",
        "melhor_custo",
        "custo_atual",
        "tempo_decorrido"
    ]
    write_header = not os.path.exists(path)

    with open(path, mode="a", newline="") as f:
        writer = csv.writer(f)
        if write_header:
            writer.writerow(header)
        writer.writerow([
            instancia,
            iteration,
            ant_index,
            best_cost,
            current_cost,
            round(time.time() - tempo_inicio_global, 3)
        ])

In [20]:
def aco_qap(
    fluxo: np.ndarray,
    distancia: np.ndarray,
    n_formigas: int = 10,
    alpha: float = 1,
    beta: float = 2,
    rho: float = 0.1,
    Q: float = 1,
    iter_max: int = 100,
    instancia: str = "desconhecida",
    tempo_inicio_global: float = None,
    tempo_global_max: float = None
) -> Tuple[List[int], int]:
    """
    Algoritmo de Otimização por Colônia de Formigas (ACO) puro com salvamento de resultados.
    """

    n: int = len(fluxo)
    heuristica: np.ndarray = 1 / (np.array(distancia) + 1e-10)
    feromonio: np.ndarray = inicializar_feromonio(n)

    melhor_solucao: Optional[List[int]] = None
    melhor_custo: float = float("inf")
    file_name = f"iteracoes_{instancia}_ACO.csv"

    for i in range(iter_max):
        if tempo_inicio_global and tempo_global_max:
            if (time.time() - tempo_inicio_global) > tempo_global_max:
                return melhor_solucao if melhor_solucao else [], int(melhor_custo)

        solucoes: List[List[int]] = []
        custos: List[int] = []

        for j in range(n_formigas):
            if tempo_inicio_global and tempo_global_max:
                if (time.time() - tempo_inicio_global) > tempo_global_max:
                    return melhor_solucao if melhor_solucao else [], int(melhor_custo)

            s: List[int] = construir_solucao_qap(feromonio, alpha, beta, heuristica)
            custo: int = calcular_aptidao_qap(s, fluxo, distancia)

            solucoes.append(s)
            custos.append(custo)

            if custo < melhor_custo:
                melhor_solucao, melhor_custo = s, custo

            save_aco_iteration_results(
                file_name=file_name,
                instancia=instancia,
                iteration=i,
                ant_index=j,
                best_cost=melhor_custo,
                current_cost=custo,
                tempo_inicio_global=tempo_inicio_global or time.time()
            )

        feromonio = atualizar_feromonio(feromonio, solucoes, custos, rho, Q)

        save_aco_iteration_results(
            file_name=file_name,
            instancia=instancia,
            iteration=i,
            ant_index=-1,  
            best_cost=melhor_custo,
            current_cost=min(custos),
            tempo_inicio_global=tempo_inicio_global or time.time()
        )

    return melhor_solucao if melhor_solucao else [], int(melhor_custo)


In [21]:
def ler_qap_com_n(caminho: str) -> Tuple[int, pd.DataFrame, pd.DataFrame]:
    with open(caminho, "r") as f:
        dados: List[int] = list(map(int, f.read().split()))

    n: int = dados[0]
    valores: List[int] = dados[1:]

    total_esperado: int = 2 * n * n
    if len(valores) != total_esperado:
        raise ValueError(f"Esperado {total_esperado} valores, mas encontrado {len(valores)}.")

    flow_flat: List[int] = valores[: n * n]
    dist_flat: List[int] = valores[n * n :]

    flow_df: pd.DataFrame = pd.DataFrame([flow_flat[i * n : (i + 1) * n] for i in range(n)])
    dist_df: pd.DataFrame = pd.DataFrame([dist_flat[i * n : (i + 1) * n] for i in range(n)])

    return n, flow_df, dist_df

In [None]:
if __name__ == "__main__":
    arquivos = glob.glob("*.txt")

    tempo_global_max = 10 * 60  # 10 minutos por instância

    for arquivo in arquivos:
        nome_instancia = os.path.splitext(os.path.basename(arquivo))[0]

        n, flow_df, dist_df = ler_qap_com_n(arquivo)
        matriz_fluxo = np.array(flow_df.values.tolist())
        matriz_distancia = np.array(dist_df.values.tolist())

        resultados = []
        tempo_inicio_instancia = time.time()

        for seed in range(42, 52):
            tempo_decorrido_global = time.time() - tempo_inicio_instancia
            if tempo_decorrido_global >= tempo_global_max:
                print(f"\n Tempo limite global atingido para {nome_instancia}.")
                break

            random.seed(seed)
            np.random.seed(seed)

            process = psutil.Process(os.getpid())
            tempo_inicio_seed = time.time()

            # n_formigas = numero de formigas
            # alpha = importancia do feromonio
            # beta = importancia da heuristica
            # rho = taxa de evaporacao do feromonio
            # Q = quantidade de feromonio depositada
            # iter_max = numero maximo de iteracoes
            """parametros = {
                "n_formigas": random.randint(5, 20),
                "alpha": random.uniform(0.5, 2.0),
                "beta": random.uniform(1.0, 3.0),
                "rho": random.uniform(0.05, 0.2),
                "Q": 1,
                "iter_max": random.randint(50, 150) 
            }"""
            parametros = {
                "n_formigas": 10,
                "alpha": 1.0,
                "beta": 2.0,
                "rho": random.choice([0.02, 0.05, 0.2]),
                "Q": 1,
                "iter_max": 10
            }

            melhor_solucao, melhor_custo = aco_qap(
                fluxo=matriz_fluxo,
                distancia=matriz_distancia,
                instancia=nome_instancia,
                tempo_inicio_global=tempo_inicio_instancia,
                tempo_global_max=tempo_global_max,
                **parametros
            )

            tempo_fim_seed = time.time()

            tempo_restante_global = max(0, tempo_global_max - (tempo_inicio_seed - tempo_inicio_instancia))
            tempo_execucao = min(tempo_fim_seed - tempo_inicio_seed, tempo_restante_global)

            try:
                memoria_usada = process.memory_info().peak_wset / (1024 * 1024)
            except AttributeError:
                memoria_usada = process.memory_info().rss / (1024 * 1024)

            resultados.append({
                "instancia": nome_instancia,
                "seed": seed,
                "melhor_solucao": melhor_solucao,
                "custo": melhor_custo,
                "tempo_execucao_segundos": round(tempo_execucao, 2),
                "memoria_usada_MB": round(memoria_usada, 2),
                **parametros
            })

            print(f"[{nome_instancia}] Seed {seed} finalizada. "
                  f"Custo: {melhor_custo} | Memória usada: {round(memoria_usada, 2)} MB")

        df_resultados = pd.DataFrame(resultados)
        df_resultados.to_csv(f"resultados_{nome_instancia}_ACO_10min.csv", index=False)

        print(f"\nResultados da instância {nome_instancia} salvos em resultados_{nome_instancia}_ACO.csv")

[Tai80b] Seed 42 finalizada. Custo: 1166488020 | Memória usada: 108.34 MB
[Tai80b] Seed 43 finalizada. Custo: 1154308712 | Memória usada: 108.34 MB
[Tai80b] Seed 44 finalizada. Custo: 1153980554 | Memória usada: 108.34 MB
[Tai80b] Seed 45 finalizada. Custo: 1170522385 | Memória usada: 108.34 MB
[Tai80b] Seed 46 finalizada. Custo: 1130733124 | Memória usada: 108.34 MB
[Tai80b] Seed 47 finalizada. Custo: 1152344930 | Memória usada: 108.34 MB
[Tai80b] Seed 48 finalizada. Custo: 1154890692 | Memória usada: 108.34 MB
[Tai80b] Seed 49 finalizada. Custo: 1168492625 | Memória usada: 108.34 MB
[Tai80b] Seed 50 finalizada. Custo: 1151307097 | Memória usada: 108.34 MB
[Tai80b] Seed 51 finalizada. Custo: 1168781455 | Memória usada: 108.34 MB

Resultados da instância Tai80b salvos em resultados_Tai80b_ACO.csv
