# Imports and types

In [64]:
import random
import time
import numpy as np
import pandas as pd
import psutil
import os 
import glob
from typing import List, Tuple, Callable
import csv

In [65]:
def switch_mutation(solucao: List[int]) -> List[int]:
    nova = solucao[:]
    a, b = random.sample(range(len(nova)), 2)
    nova[a], nova[b] = nova[b], nova[a]
    return nova

In [66]:
def permutation_mutation(solucao: List[int]) -> List[int]:
    nova = solucao[:]
    a, b = sorted(random.sample(range(len(nova)), 2))
    nova[a:b+1] = reversed(nova[a:b+1])
    return nova

In [67]:
def save_es_vnd_iteration_results(file_name: str,
                                  instancia: str,
                                  iteration: int,
                                  melhor_aptidao: float,
                                  melhor_solucao: List[int],
                                  tempo_decorrido: float,
                                  memoria_usada: float,
                                  parametros: dict):
    """
    Salva os resultados parciais de uma execução do ES_VND em um arquivo CSV.
    Cria o arquivo com cabeçalho na primeira execução e adiciona novas linhas a cada iteração.

    Parâmetros
    ----------
    file_name : str
        Nome do arquivo CSV a ser salvo.
    instancia : str
        Nome da instância sendo executada.
    iteration : int
        Iteração atual da ES.
    melhor_aptidao : float
        Valor da melhor aptidão encontrada até o momento.
    melhor_solucao : List[int]
        Solução correspondente à melhor aptidão.
    tempo_decorrido : float
        Tempo decorrido desde o início da execução (em segundos).
    memoria_usada : float
        Memória utilizada (em MB).
    parametros : dict
        Dicionário com parâmetros da execução (mu, lambda, taxas, n, seed etc.).
    """
    file_exists = os.path.isfile(file_name)

    with open(file_name, mode="a", newline="") as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow([
                "instancia",
                "iteration",
                "melhor_custo",
                "tempo_decorrido_s",
                "memoria_usada_MB",
                "mu",
                "lambd",
                "taxa_mutacao",
                "taxa_busca_local",
                "iter_sem_melhora_max",
                "n",
                "seed",
                "melhor_solucao"
            ])

        writer.writerow([
            instancia,
            iteration,
            -melhor_aptidao,  # custo positivo
            round(tempo_decorrido, 2),
            round(memoria_usada, 2),
            parametros.get("mu"),
            parametros.get("lambd"),
            round(parametros.get("taxa_mutacao", 0), 4),
            round(parametros.get("taxa_busca_local", 0), 4),
            parametros.get("iter_sem_melhora_max"),
            parametros.get("n"),
            parametros.get("seed"),
            melhor_solucao
        ])

In [68]:
# M1: Troca de elementos adjacentes
def gerar_vizinhanca_1(solucao: List[int]) -> List[List[int]]:
    vizinhanca = []
    for i in range(len(solucao) - 1):
        vizinho = solucao[:]
        vizinho[i], vizinho[i+1] = vizinho[i+1], vizinho[i]
        vizinhanca.append(vizinho)
    return vizinhanca

In [69]:
# M2: Troca de elementos com distância 2
def gerar_vizinhanca_2(solucao: List[int]) -> List[List[int]]:
    vizinhanca = []
    for i in range(len(solucao) - 2):
        vizinho = solucao[:]
        vizinho[i], vizinho[i+2] = vizinho[i+2], vizinho[i]
        vizinhanca.append(vizinho)
    return vizinhanca

In [70]:
# M3: Todas as trocas possíveis entre pares de posições
def gerar_vizinhanca_3(solucao: List[int]) -> List[List[int]]:
    vizinhanca = []
    n = len(solucao)
    for i in range(n):
        for j in range(i + 1, n):
            vizinho = solucao[:]
            vizinho[i], vizinho[j] = vizinho[j], vizinho[i]
            vizinhanca.append(vizinho)
    return vizinhanca

In [71]:
# M4: Inserção de um elemento em posição anterior
def gerar_vizinhanca_4(solucao: List[int]) -> List[List[int]]:
    vizinhanca = []
    n = len(solucao)
    for i in range(n):
        for j in range(i):
            vizinho = solucao[:]
            elem = vizinho.pop(i)
            vizinho.insert(j, elem)
            vizinhanca.append(vizinho)
    return vizinhanca

In [72]:
# M5: Inserção de um elemento em posição posterior
def gerar_vizinhanca_5(solucao: List[int]) -> List[List[int]]:
    vizinhanca = []
    n = len(solucao)
    for i in range(n):
        for j in range(i+1, n):
            vizinho = solucao[:]
            elem = vizinho.pop(i)
            vizinho.insert(j, elem)
            vizinhanca.append(vizinho)
    return vizinhanca

In [73]:
def gerar_vizinhanca_6(solucao: List[int]) -> List[List[int]]:
    vizinhanca = []
    n = len(solucao)
    # percorre pares consecutivos (u,x) e (v,y)
    for i in range(n-1):
        for j in range(i+2, n-1):  # começa em i+2 para evitar sobreposição
            vizinho = solucao[:]
            # troca os pares
            vizinho[i], vizinho[i+1], vizinho[j], vizinho[j+1] = (
                vizinho[j], vizinho[j+1], vizinho[i], vizinho[i+1]
            )
            vizinhanca.append(vizinho)
    return vizinhanca

In [74]:
def gerar_vizinhanca_7(solucao: List[int]) -> List[List[int]]:
    vizinhanca = []
    n = len(solucao)
    for i in range(n-1):
        for j in range(i+2, n-1):  # garante que não sejam adjacentes
            vizinho = solucao[:]
            u, x, v, y = vizinho[i], vizinho[i+1], vizinho[j], vizinho[j+1]
            # substitui (u,x) e (v,y) por (u,v) e (x,y)
            vizinho[i], vizinho[i+1], vizinho[j], vizinho[j+1] = u, v, x, y
            vizinhanca.append(vizinho)
    return vizinhanca

In [75]:
# M8: 2-opt (reversão de qualquer sublista)
def gerar_vizinhanca_8(solucao: List[int]) -> List[List[int]]:
    vizinhanca = []
    n = len(solucao)
    for i in range(n):
        for j in range(i+2, n):  # precisa pelo menos 2 de diferença
            vizinho = solucao[:]
            vizinho[i:j] = reversed(vizinho[i:j])
            vizinhanca.append(vizinho)
    return vizinhanca

In [76]:
# M9: Swap de blocos de tamanho 2
def gerar_vizinhanca_9(solucao: List[int]) -> List[List[int]]:
    vizinhanca = []
    n = len(solucao)
    for i in range(n-1):
        for j in range(i+2, n-1):
            vizinho = solucao[:]
            vizinho[i:i+2], vizinho[j:j+2] = vizinho[j:j+2], vizinho[i:i+2]
            vizinhanca.append(vizinho)
    return vizinhanca

In [77]:
def gerar_array_replicavel(seed: int, tamanho: int) -> list[int]:
    random.seed(seed)
    vetor = list(range(tamanho))  
    random.shuffle(vetor)
    return vetor

In [78]:
def calcular_aptidao_qap(solucao: List[int],
                         matriz_fluxo: pd.DataFrame,
                         matriz_distancia: pd.DataFrame) -> int:
    n = len(solucao)
    custo = 0
    for i in range(n):
        for j in range(n):
            custo += matriz_fluxo[i][j] * matriz_distancia[solucao[i]][solucao[j]]
    return -custo   # agora retorna custo negativo

In [79]:
def calcular_melhor_vizinho(vizinhanca: List[List[int]],
                            matriz_fluxo: pd.DataFrame,
                            matriz_distancia: pd.DataFrame) -> List[int]:
    melhor_vizinho = vizinhanca[0]
    for i in range(1, len(vizinhanca)):
        if calcular_aptidao_qap(vizinhanca[i], matriz_fluxo, matriz_distancia) > calcular_aptidao_qap(melhor_vizinho, matriz_fluxo, matriz_distancia):
            melhor_vizinho = vizinhanca[i]
    return melhor_vizinho

In [80]:
def VND(solucao: List[int],
        matriz_fluxo: pd.DataFrame,
        matriz_distancia: pd.DataFrame,
        tempo_inicio_global: float = None,  
        tempo_global_max: float = None
        ) -> List[int]:
    """
    Variable Neighborhood Descent (VND) puro, sem Simulated Annealing.

    Parâmetros
    ----------
    solucao : list[int]
        Solução inicial.
    matriz_fluxo, matriz_distancia : pd.DataFrame
        Matrizes do problema QAP.
    tempo_inicio_global : float, opcional
        Momento de início do algoritmo, usado para controle de tempo.
    tempo_global_max : float, opcional
        Tempo máximo permitido de execução.

    Retorna
    -------
    solucao_atual : list[int]
        Melhor solução encontrada.
    """
    solucao_atual = solucao[:]
    k = 0
    funcoes_vizinhanca: List[Callable[[List[int]], List[List[int]]]] = [
        gerar_vizinhanca_1, gerar_vizinhanca_2, gerar_vizinhanca_3,
        gerar_vizinhanca_4, gerar_vizinhanca_5, gerar_vizinhanca_6,
        gerar_vizinhanca_7, gerar_vizinhanca_8, gerar_vizinhanca_9,
    ]

    while k < len(funcoes_vizinhanca):

        if tempo_inicio_global and tempo_global_max:
            if (time.time() - tempo_inicio_global) > tempo_global_max:
                return solucao_atual
        
        vizinhos = funcoes_vizinhanca[k](solucao_atual)
        aptidao_atual = calcular_aptidao_qap(solucao_atual, matriz_fluxo, matriz_distancia)

        melhorou = False
        for vizinho in vizinhos:
            if tempo_inicio_global and tempo_global_max:
                if (time.time() - tempo_inicio_global) > tempo_global_max:
                    return solucao_atual
            
            aptidao_vizinho = calcular_aptidao_qap(vizinho, matriz_fluxo, matriz_distancia)
            if aptidao_vizinho > aptidao_atual:
                solucao_atual = vizinho
                k = 0
                melhorou = True
                break
        
        if not melhorou:
            k += 1

    return solucao_atual


In [81]:
def inicializar_populacao(lambd: int, tamanho: int, seed_base: int = 42) -> List[List[int]]:
    populacao = []
    for i in range(lambd):
        seed = seed_base + i  
        individuo = gerar_array_replicavel(seed=seed, tamanho=tamanho)
        populacao.append(individuo)
    return populacao


In [82]:
def ES_VND(instancia: str,
           mu: int,
           lambd: int,
           tempo_max: float,
           taxa_mutacao: float,
           taxa_busca_local: float,
           iter_sem_melhora_max: int,
           n: int,
           matriz_fluxo: pd.DataFrame,
           matriz_distancia: pd.DataFrame,
           seed: int = 42,
           solucao_inicial: List[int] = None,
           tempo_inicio_global: float = None,
           tempo_global_max: float = None,
           parametros: dict = None
           ) -> Tuple[List[int], float]:
    """
    ES com VND, salvando CSV usando o nome da instância passado como argumento.
    """
    if parametros is None:
        parametros = {}

    P = inicializar_populacao(mu + lambd, tamanho=n, seed_base=seed)

    melhor: List[int] | None = None
    melhor_aptidao = -float('inf')
    sem_melhora = 0
    inicio = time.time()
    iteration = 0

    while (time.time() - inicio) < tempo_max and sem_melhora < iter_sem_melhora_max:
        iteration += 1

        # Parada global
        if tempo_inicio_global and tempo_global_max:
            if (time.time() - tempo_inicio_global) > tempo_global_max:
                return (
                    melhor if melhor is not None else solucao_inicial,
                    melhor_aptidao if melhor is not None else calcular_aptidao_qap(solucao_inicial, matriz_fluxo, matriz_distancia)
                )

        aptidoes = [calcular_aptidao_qap(ind, matriz_fluxo, matriz_distancia) for ind in P]

        melhorou = False
        for ind, apt in zip(P, aptidoes):
            if apt > melhor_aptidao:
                melhor_aptidao = apt
                melhor = ind[:]
                sem_melhora = 0
                melhorou = True
        if not melhorou:
            sem_melhora += 1

        # Recursos e tempo
        process = psutil.Process(os.getpid())
        try:
            memoria_usada = process.memory_info().peak_wset / (1024 * 1024)
        except AttributeError:
            memoria_usada = process.memory_info().rss / (1024 * 1024)

        tempo_decorrido = time.time() - inicio

        # Salvamento CSV
        save_es_vnd_iteration_results(
            file_name=f"iteracoes_{instancia}_ES_VND.csv",
            instancia=instancia,
            iteration=iteration,
            melhor_aptidao=melhor_aptidao,
            melhor_solucao=melhor,
            tempo_decorrido=tempo_decorrido,
            memoria_usada=memoria_usada,
            parametros=parametros
        )

        # Seleção e reprodução
        melhores_indices = sorted(range(len(P)), key=lambda i: aptidoes[i], reverse=True)[:mu]
        Q = [P[i][:] for i in melhores_indices]
        nova_geracao: List[List[int]] = Q[:]

        for q in Q:
            if tempo_inicio_global and tempo_global_max:
                if (time.time() - tempo_inicio_global) > tempo_global_max:
                    return (
                        melhor if melhor is not None else solucao_inicial,
                        melhor_aptidao if melhor is not None else calcular_aptidao_qap(solucao_inicial, matriz_fluxo, matriz_distancia)
                    )

            for _ in range(lambd // mu):
                individuo = q[:]
                if random.random() < taxa_mutacao:
                    individuo = permutation_mutation(individuo)
                if random.random() < taxa_busca_local:
                    individuo = VND(individuo, matriz_fluxo, matriz_distancia,
                                    tempo_inicio_global, tempo_global_max)
                nova_geracao.append(individuo)

                # Salvamento secundário
                save_es_vnd_iteration_results(
                    file_name=f"iteracoes_{instancia}_ES_VND.csv",
                    instancia=instancia,
                    iteration=iteration,
                    melhor_aptidao=melhor_aptidao,
                    melhor_solucao=melhor,
                    tempo_decorrido=time.time() - inicio,
                    memoria_usada=memoria_usada,
                    parametros=parametros
                )

        P = nova_geracao

    return (
        melhor if melhor is not None else solucao_inicial,
        melhor_aptidao if melhor is not None else calcular_aptidao_qap(solucao_inicial, matriz_fluxo, matriz_distancia)
    )

In [83]:
def ler_qap_com_n(caminho: str) -> Tuple[int, pd.DataFrame, pd.DataFrame]:
    with open(caminho, "r") as f:
        dados = list(map(int, f.read().split()))

    n = dados[0]
    valores = dados[1:]

    total_esperado = 2 * n * n
    if len(valores) != total_esperado:
        raise ValueError(f"Esperado {total_esperado} valores, mas encontrado {len(valores)}.")

    flow_flat = valores[:n * n]
    dist_flat = valores[n * n:]

    flow_df = pd.DataFrame([flow_flat[i * n:(i + 1) * n] for i in range(n)])
    dist_df = pd.DataFrame([dist_flat[i * n:(i + 1) * n] for i in range(n)])

    return n, flow_df, dist_df

In [84]:
if __name__ == "__main__":
    arquivos = glob.glob("*.txt")

    # define limite global de tempo por instância (em segundos)
    tempo_global_max = 10 * 60  # ex: 5 minutos por instância

    for arquivo in arquivos:
        nome_instancia = os.path.splitext(os.path.basename(arquivo))[0]

        n, flow_df, dist_df = ler_qap_com_n(arquivo)
        flow = flow_df.values.tolist()
        dist = dist_df.values.tolist()
        matriz_fluxo = np.array(flow)
        matriz_distancia = np.array(dist)

        resultados = []  
        tempo_inicio_instancia = time.time()

        for seed in range(42, 52):
            if (time.time() - tempo_inicio_instancia) > tempo_global_max:
                print(f"\n Tempo limite global atingido para {nome_instancia}.")
                break

            random.seed(seed)
            process = psutil.Process(os.getpid())
            tempo_inicio = time.time()

            solucao_inicial = gerar_array_replicavel(seed=seed, tamanho=n)

            
            parametros = {
                "instancia": nome_instancia,
                "mu": random.randint(5, 10),
                "lambd": random.randint(30, 100),
                "tempo_max": random.randint(3, 10) * 60,
                "taxa_mutacao": random.uniform(0.4, 0.7),
                "taxa_busca_local": random.uniform(0.4, 0.7),
                "iter_sem_melhora_max": random.randint(5, 10),
                "n": n,
                "seed": seed,
            }

            """
                parametros = {
                    "mu": random.randint(3, 5),
                    "lambd": random.randint(15, 50),
                    "tempo_max": random.randint(3, 10) * 60,
                    "taxa_mutacao": random.uniform(0.6, 0.9),
                    "taxa_busca_local": random.uniform(0.4, 0.7),
                    "iter_sem_melhora_max": random.randint(3, 6),
                    "matriz_fluxo": matriz_fluxo,
                    "matriz_distancia": matriz_distancia,
                    "solucao_inicial": solucao_inicial,
                    "n": n,
                }
            """

            melhor_solucao, melhor_valor = ES_VND(
                instancia=nome_instancia,
                mu=parametros["mu"],
                lambd=parametros["lambd"],
                tempo_max=parametros["tempo_max"],
                taxa_mutacao=parametros["taxa_mutacao"],
                taxa_busca_local=parametros["taxa_busca_local"],
                iter_sem_melhora_max=parametros["iter_sem_melhora_max"],
                n=parametros["n"],
                matriz_fluxo=matriz_fluxo,
                matriz_distancia=matriz_distancia,
                seed=parametros["seed"],
                solucao_inicial=solucao_inicial,
                tempo_inicio_global=tempo_inicio_instancia,
                tempo_global_max=tempo_global_max,
                parametros=parametros
            )


            tempo_fim = time.time()
            tempo_decorrido = min(
                tempo_fim - tempo_inicio,
                max(0, tempo_global_max - (tempo_inicio - tempo_inicio_instancia))
            )

            try:
                memoria_usada = process.memory_info().peak_wset / (1024 * 1024)  
            except AttributeError:
                memoria_usada = process.memory_info().rss / (1024 * 1024)  

            resultados.append({
                "instancia": nome_instancia,
                "seed": seed,
                "melhor_solucao": melhor_solucao,
                "custo": -melhor_valor,
                "tempo_execucao_segundos": round(tempo_decorrido, 2),
                "memoria_usada_MB": round(memoria_usada, 2)
            })

            print(f"[{nome_instancia}] Seed {seed} finalizada. Custo: {(-melhor_valor)} | Memória usada: {round(memoria_usada, 2)} MB")

        df_resultados = pd.DataFrame(resultados)
        df_resultados.to_csv(f"resultados_{nome_instancia}_ES_VND_10min.csv", index=False)

        print(f"\nResultados da instância {nome_instancia} salvos em resultados_{nome_instancia}.csv")

[Tai100a] Seed 42 finalizada. Custo: 23814662 | Memória usada: 120.46 MB

 Tempo limite global atingido para Tai100a.

Resultados da instância Tai100a salvos em resultados_Tai100a.csv
[Tai100b] Seed 42 finalizada. Custo: 1668818663 | Memória usada: 120.46 MB

 Tempo limite global atingido para Tai100b.

Resultados da instância Tai100b salvos em resultados_Tai100b.csv
[Tai150b] Seed 42 finalizada. Custo: 636204120 | Memória usada: 120.46 MB

 Tempo limite global atingido para Tai150b.

Resultados da instância Tai150b salvos em resultados_Tai150b.csv
[Tai256c] Seed 42 finalizada. Custo: 51213226 | Memória usada: 148.51 MB

 Tempo limite global atingido para Tai256c.

Resultados da instância Tai256c salvos em resultados_Tai256c.csv
[Tai80a] Seed 42 finalizada. Custo: 15301482 | Memória usada: 148.51 MB

 Tempo limite global atingido para Tai80a.

Resultados da instância Tai80a salvos em resultados_Tai80a.csv
[Tho150] Seed 42 finalizada. Custo: 9681914 | Memória usada: 148.51 MB

 Tempo li