#### IMPORTS

In [643]:
#Importa os módulos usados

import numpy as np # type: ignore
import matplotlib.pyplot as plt # type: ignore
from typing import Dict, List
import copy
import random


# Define um tipo de dado similar ao Pascal "record" or C "struct"

class Struct:
    pass

INVALID: int = -1

#### Utils de leitura do CSV

In [644]:
class Cliente:
    def __init__(self, x, y, consumo_banda):
        self.x = x
        self.y = y
        self.consumo_banda = consumo_banda
        self.connected_PA_id = INVALID
        self.id = INVALID

    def __repr__(self):
        return f"Cliente(x={self.x}, y={self.y}, consumo_banda={self.consumo_banda})"

# Função para ler os clientes do arquivo
def ler_clientes(nome_arquivo):
    clientes = []
    with open(nome_arquivo, 'r') as arquivo:
        linhas = arquivo.readlines()
        for linha in linhas:
            x, y, consumo_banda = map(float, linha.strip().split(','))
            cliente = Cliente(x, y, consumo_banda)
            clientes.append(cliente)

        index = 0
        for cliente in clientes:
            cliente.id = index
            index += 1

    return clientes

In [645]:
def clientProcessing(clientList, nCL=495, nPA=81**2):

    cons = np.zeros(nCL)
    dist = np.zeros((nCL, nPA))
    exp = np.zeros((nCL, nPA))

    cl_index = 0
    for cliente in clientList:
        cons[cl_index] = cliente.consumo_banda

        for PA_index in range(nPA):
            PA_x = (PA_index % 81) * 5
            PA_y = (PA_index // 81) * 5
            
            dist[cl_index, PA_index] = np.sqrt((cliente.x - PA_x)**2 + (cliente.y - PA_y)**2)
            exp[cl_index, PA_index] = 1/dist[cl_index, PA_index]

        cl_index += 1

    return cons, dist, exp

#### Classe Solução

In [646]:
class Solucao:
    id: int = 0
    sol_PAs_ativos: Dict[int, int] = {}
    sol_PA_id_por_cliente: Dict[int, int] = {}
    fitness_PA_min: float = 0
    fitness_PA_min_penalizado: float = 0
    penalidade: float = 0
    fitness_dist_min: float = 0
    fitness_dist_min_penalizado: float = 0
    porcentagem_CL_antendidos = 0
    nCL_por_PA: Dict[int, int] = {}

    def __init__(self) -> None:
        self.id: int = 0
        self.sol_PAs_ativos: Dict[int, int] = {}
        self.sol_PA_id_por_cliente: Dict[int, int] = {}
        self.fitness_PA_min: float = 0
        self.fitness_PA_min_penalizado: float = 0
        self.fitness_dist_min: float = 0
        self.fitness_dist_min_penalizado: float = 0
        self.porcentagem_CL_antendidos = 0
        self.nCL_por_PA: Dict[int, int] = {}

#### Dados do problema

In [647]:
class AccessPoint:
    id: int = 0
    x: int = 0
    y: int = 0
    nCL_in_range: int = 0

    def __init__(self, x: int, y: int, index: int) -> None:
        self.x = x
        self.y = y
        self.id = index
        self.nCL_in_range: int = 0
        pass

In [648]:
class ProblemData:
    PA_list: List[AccessPoint] = []
    CL_list: List[Cliente] = []
    dist_PA_CL: Dict[int, Dict[int, float]] = np.empty((81**2, 495))
    min_CL_rate: float = 0.05
    nCL: int = 495
    nPA: int = 81**2

In [649]:
def probdef(nPA=81**2, nCL=495):
    
    probdata = ProblemData()

    # Uso da função para ler os clientes do arquivo
    probdata.CL_list = ler_clientes('data/clientes.csv')

    for id in range(0, nPA):
        PA_x = (id % 81) * 5
        PA_y = (id // 81) * 5
        probdata.PA_list.append(AccessPoint(PA_x, PA_y, id))

    for PA in probdata.PA_list:
        for CL in probdata.CL_list:
            probdata.dist_PA_CL[PA.id][CL.id] = ((PA.x - CL.x)**2 + (PA.y - CL.y)**2) ** 0.5

            if(probdata.dist_PA_CL[PA.id][CL.id] <= 84):
                PA.nCL_in_range += 1
    
    probdata.PA_cap = 54 # em Mbps
    probdata.PA_raio = 84 # em metros
    probdata.CL_min_p = 0.05 # porcentagem minima de clientes
    probdata.nPA_max = 30

    return probdata

#### Solução inicial

In [650]:
# heuristica inserção mais proxima
# Solução inicial
def sol_inicial(probdata):
    sol = Solucao()
    
     # Calcular distância euclidiana entre clientes e pontos de acesso
    distancias = np.zeros((probdata.nCL, probdata.nPA))
    for i, CL in enumerate(probdata.CL_list):
        for j, PA in enumerate(probdata.PA_list):
            distancias[i, j] = np.sqrt((CL.x - PA.x) ** 2 + (CL.y - PA.y) ** 2)

    # Atribuir cada cliente ao ponto de acesso mais próximo
    for j in range(len(probdata.CL_list)):
        distancias_cliente = distancias[j]
        pa_mais_proximo = np.argmin(distancias_cliente)
        sol.sol_PA_id_por_cliente[j] = pa_mais_proximo  # Dá o PA mais proximo como ativo para o cliente j
        sol.sol_PAs_ativos[pa_mais_proximo] = 1  # Ativa o PA mais próximo

    return sol

#### Função Penalidades Generalizadas

In [651]:
# Função de penalidades para todas as restrições
def penalties(sol: Solucao, probdata: ProblemData):

    # percentual mínimo de clientes
    pen_CLmin = probdata.nCL * probdata.CL_min_p - np.sum(list(sol.sol_PAs_ativos.values()))
    pen_CLmin = np.sum(np.where(pen_CLmin <= 0, 0, pen_CLmin)**2)
    print("Penalidades executadas: 1", end="")

    # limite de consumo dos PAs
    pen_PAcap = np.zeros(probdata.nPA)
    for i in range(probdata.nPA):
        #Acha a lista de id de clientes que possuem conexão ativa com o PA i
        CLs_ids = [k for k, v in sol.sol_PA_id_por_cliente.items() if v == i]
        consumo_total = 0
        index = 0
        for CL in (probdata.CL_list):
            if index in CLs_ids:
                consumo_total += CL.consumo_banda
            index += 1
        
        current_PA_ativo = 0
        if i in sol.sol_PAs_ativos:
            current_PA_ativo = 1

        pen_PAcap[i] = consumo_total - current_PA_ativo * probdata.PA_cap
        pen_PAcap[i] = np.sum(np.where(pen_PAcap[i] <= 0, 0, pen_PAcap[i])**2)
    
    print("-2", end="")

    # limite de distância entre PAs e clientes
    pen_dist = np.zeros((probdata.nCL, probdata.nPA))
    for i, PA in enumerate(probdata.PA_list):
        #Acha a lista de id de clientes que possuem conexão ativa com o PA i
        CLs_ids = [k for k, v in sol.sol_PA_id_por_cliente.items() if v == i]

        current_PA_ativo = 0
        if PA.id in sol.sol_PAs_ativos:
            current_PA_ativo = 1

        for CL in probdata.CL_list:

            areTheyConnected = 0
            if CL.id in CLs_ids:
                areTheyConnected = 1

            pen_dist[CL.id, i] = probdata.dist_PA_CL[PA.id, CL.id] * areTheyConnected - current_PA_ativo * probdata.PA_raio
            pen_dist[CL.id, i] = np.sum(np.where(pen_dist[CL.id, i] <= 0, 0, pen_dist[CL.id, i])**2)
    print("-3", end="")

    # pelo menos 5% de exposição à rede
    pen_Expmin = np.zeros(probdata.nCL)
    for i, CL in enumerate(probdata.CL_list):
        totalExposure: float = 0
        for j in sol.sol_PAs_ativos.keys():
            totalExposure += 1/probdata.dist_PA_CL[j, CL.id]

        pen_Expmin[i] = 0.05 * 1 - totalExposure
        pen_Expmin[i] = np.sum(np.where(pen_Expmin[i] <= 0, 0, pen_Expmin[i])**2)
    print("-4", end="")

    # número máximo de PAs
    pen_PAmax = sum(sol.sol_PAs_ativos.values()) - probdata.nPA_max
    print("-5")

    # return all multiplied by U
    return 1000 * (pen_CLmin  + sum(pen_dist) + sum(pen_Expmin)  + pen_PAmax + sum(pen_PAcap))

#### Funçao Objetivo 1: Minimizar número de PAs ativos

In [652]:
# Função objetivo 1: Minimizar número de PAs ativos
def fobj_minPA(sol: Solucao, probdata):

    nCL_por_PA_local: Dict[int, int] = {}
    nCL_atendidos: int = 0
    for CL in probdata.CL_list:
        if sol.sol_PA_id_por_cliente[CL.id] != INVALID:
            nCL_atendidos += 1
            if sol.sol_PA_id_por_cliente[CL.id] in nCL_por_PA_local:
                nCL_por_PA_local[sol.sol_PA_id_por_cliente[CL.id]] += 1
            else:
                nCL_por_PA_local[sol.sol_PA_id_por_cliente[CL.id]] = 1
    
    sol.nCL_por_PA = nCL_por_PA_local


    sol.fitness_PA_min = len(sol.sol_PAs_ativos)
    sol.penalidade = penalties(sol, probdata)
    sol.fitness_PA_min_penalizado = sol.fitness_PA_min + sol.fitness_PA_min_penalizado
    sol.porcentagem_CL_antendidos = nCL_atendidos / 495

    return sol

#### Função Objetivo 2: Minimizar distâncias entre clientes e PAs

In [653]:
# Função objetivo 2: Minimizar distância cumulativa de clientes e PAs
def fobj_minDist(sol: Solucao, probdata):

    nCL_por_PA_local: Dict[int, int] = {}
    nCL_atendidos: int = 0
    for CL in probdata.CL_list:
        if sol.sol_PA_id_por_cliente[CL.id] != INVALID:
            nCL_atendidos += 1
            if sol.sol_PA_id_por_cliente[CL.id] in nCL_por_PA_local:
                nCL_por_PA_local[sol.sol_PA_id_por_cliente[CL.id]] += 1
            else:
                nCL_por_PA_local[sol.sol_PA_id_por_cliente[CL.id]] = 1
    
    sol.nCL_por_PA = nCL_por_PA_local

    dists: float = 0

    for CL_id, PA_id in sol.sol_PA_id_por_cliente.items():
        dists += probdata.dist_PA_CL[PA_id, CL_id]

    sol.fitness_dist_min = dists
    sol.penalidade = penalties(sol, probdata)
    sol.fitness_dist_min_penalizado = sol.fitness_dist_min + sol.penalidade
    sol.porcentagem_CL_antendidos = nCL_atendidos / 495

    return sol

#### Shake Função Objetivo 1

In [654]:
def shake_minPA(x: Solucao, k: int, probdata):
    y = copy.deepcopy(x)
    tres_menores_valores = sorted(y.nCL_por_PA.values())[:k]
    chaves_correspondentes = []
    for chave, valor in y.nCL_por_PA.items():
        if valor in tres_menores_valores:
            chaves_correspondentes.append(chave)
    index = 0
    keys = list(x.sol_PAs_ativos.keys())
    generated_keys: Dict[int, int] = {}
    while index < k:
        
        #chave_para_substituir = chaves_correspondentes[index]
        key = random.choice(keys)
        
        failsafeIndex = 0
        while (key in generated_keys) and (failsafeIndex < 10 or len(keys) > 3):
            key = random.choice(keys)
            failsafeIndex += 1
        generated_keys[key] = 0
    
        y.sol_PAs_ativos.pop(key)
        new_key = int(81**2 * random.random())
        y.sol_PAs_ativos[new_key] = new_key
        index += 1
        
    
    if len(y.sol_PAs_ativos) <= 9:
        while len(y.sol_PAs_ativos) < 30:
            key = int(81**2 * random.random())
            if not (key in y.sol_PAs_ativos):
                y.sol_PAs_ativos[key] = key
    
    return y

#### Shake Função Objetivo 2

In [655]:
def shake_minDist(x: Solucao, k: int, probdata):
    y = copy.deepcopy(x)
    
    index = 0
    keys = list(x.sol_PAs_ativos.keys())
    generated_keys: Dict[int, int] = {}
    while index < k:
        key = random.choice(keys)
        while (key in generated_keys):
            key = random.choice(keys)
        
        generated_keys[key] = 0
    
        y.sol_PAs_ativos.pop(key)
        new_key = int(81**2 * random.random())
        y.sol_PAs_ativos[new_key] = new_key
        index += 1
    
    if len(y.sol_PAs_ativos) <= 18: ## VERIFICAR 18
        while len(y.sol_PAs_ativos) < 30:
            key = int(81**2 * random.random())
            if not (key in y.sol_PAs_ativos):
                y.sol_PAs_ativos[key] = key
    
    return y

#### Neighborhood Change Função Objetivo 1

In [656]:
def neighborhoodChange_minPA(x: Solucao, y: Solucao, k: int):
    
    if  (y.fitness_PA_min_penalizado < x.fitness_PA_min_penalizado or y.fitness_PA_min_penalizado == x.fitness_PA_min_penalizado and y.porcentagem_CL_antendidos > x.porcentagem_CL_antendidos):
        x = copy.deepcopy(y)
        k  = 1
    else:
        k += 1
        
    return x, k 

#### Neighborhood Change Função Objetivo 2

In [657]:
def neighborhoodChange(x: Solucao, y: Solucao, k: int):
    
    if  (y.fitness_dist_min_penalizado < x.fitness_dist_min_penalizado or y.fitness_dist_min_penalizado == x.fitness_dist_min_penalizado and y.porcentagem_CL_antendidos > x.porcentagem_CL_antendidos):
        x = copy.deepcopy(y)
        k  = 1
    else:
        k += 1
        
    return x, k 

#### RVNS

In [658]:
def RVNS_minPA(solucao_inicial: Solucao, dados: ProblemData, historico: Struct) -> Solucao:
    # Contador do número de soluções candidatas avaliadas
    num_sol_avaliadas = 1

    # Máximo número de soluções candidatas avaliadas
    max_num_sol_avaliadas = 2000

    # Número de estruturas de vizinhanças definidas
    kmax = 3

    # Gera solução inicial
    x = solucao_inicial

    # Armazena dados para plot
    historico.fit.append(x.fitness_PA_min)
    historico.sol.append(x.sol_PAs_ativos)
    historico.pen.append(x.penalidade)
    historico.fit_pen.append(x.fitness_PA_min_penalizado)


    # Ciclo iterativo do método
    while num_sol_avaliadas < max_num_sol_avaliadas:
        print(f"Solução nº {num_sol_avaliadas}\n")

        loopIndex = 0
        k = 1
        while k <= kmax:
            
            # Gera uma solução candidata na k-ésima vizinhança de x    
            loopIndex += 1
      
            y = shake_minPA(x, k, dados)

            y = fobj_minPA(y, dados)
            num_sol_avaliadas += 1
            y.id = num_sol_avaliadas
            
            # Atualiza solução corrente e estrutura de vizinhança (se necessário)
            x, k = neighborhoodChange(x, y, k)
            
            # Armazena dados para plot
            historico.fit.append(x.fitness_PA_min)
            historico.sol.append(x.sol_PAs_ativos)
            historico.pen.append(x.penalidade)
            historico.fit_pen.append(x.fitness_PA_min_penalizado)
    
    return x

#### BVNS

In [659]:
def BVNS_minPA(solucao_inicial: Solucao, dados: ProblemData):
    print("Starting BVNS process\n")

    x = fobj_minPA(solucao_inicial, dados)
    y = copy.deepcopy(x)

    historico = Struct()
    historico.fit = []
    historico.sol = []
    historico.pen = []
    historico.fit_pen = []

    numero_de_tentativas = 0
    while numero_de_tentativas < 6:
        numero_de_iteracoes_RVNS = 0
        while x.fitness_PA_min_penalizado >= y.fitness_PA_min_penalizado and numero_de_iteracoes_RVNS < 5:
            numero_de_iteracoes_RVNS += 1
            print(f"RVNS iteração nº {numero_de_iteracoes_RVNS}\n")
            x = RVNS_minPA(y, dados, historico)
            
        y = copy.deepcopy(x)
        numero_de_tentativas += 1

    
    return x, historico

#### Execução

In [660]:
dados = probdef()
solucao_inicial = sol_inicial(dados)
print(solucao_inicial.sol_PAs_ativos)
numero_de_execucoes = 0
melhores_solucoes: List[Solucao] = []
historicos_de_solucoes: List[Struct] = []

while numero_de_execucoes < 5:
    print("Starting\n")
    melhor_solucao, historico = BVNS_minPA(solucao_inicial, dados)
    print(f'execução = {numero_de_execucoes} PAs = {melhor_solucao.fitness_PA_min} Porcentagem de Clientes Atendidos = {melhor_solucao.porcentagem_CL_antendidos} penalização = {melhor_solucao.fitness_PA_min_penalizado}')
    historicos_de_solucoes.append(historico)
    melhores_solucoes.append(melhor_solucao)
    numero_de_execucoes += 1

{2275: 1, 905: 1, 1053: 1, 1168: 1, 1881: 1, 2110: 1, 1232: 1, 1055: 1, 2047: 1, 1877: 1, 1717: 1, 3989: 1, 1321: 1, 1547: 1, 1216: 1, 1797: 1, 899: 1, 2126: 1, 2759: 1, 1402: 1, 1234: 1, 815: 1, 904: 1, 186: 1, 1255: 1, 579: 1, 2379: 1, 1465: 1, 1395: 1, 2444: 1, 343: 1, 1550: 1, 407: 1, 823: 1, 1559: 1, 2036: 1, 662: 1, 1802: 1, 2532: 1, 576: 1, 499: 1, 1801: 1, 1299: 1, 679: 1, 1387: 1, 1467: 1, 974: 1, 420: 1, 1635: 1, 1309: 1, 980: 1, 811: 1, 937: 1, 598: 1, 498: 1, 2364: 1, 1154: 1, 259: 1, 901: 1, 2212: 1, 896: 1, 2790: 1, 651: 1, 661: 1, 511: 1, 2285: 1, 1645: 1, 99: 1, 818: 1, 1883: 1, 267: 1, 1415: 1, 1716: 1, 2767: 1, 1793: 1, 2032: 1, 993: 1, 819: 1, 1390: 1, 580: 1, 2211: 1, 1967: 1, 2131: 1, 2360: 1, 1218: 1, 826: 1, 1719: 1, 2280: 1, 2026: 1, 2053: 1, 2529: 1, 1548: 1, 2296: 1, 1316: 1, 1947: 1, 184: 1, 2515: 1, 248: 1, 1476: 1, 1073: 1, 341: 1, 1546: 1, 1563: 1, 2459: 1, 1405: 1, 1792: 1, 1470: 1, 2517: 1, 2206: 1, 1315: 1, 1385: 1, 2779: 1, 1159: 1, 2048: 1, 575: 1, 18

KeyboardInterrupt: 

#### Plot Evolução das soluções

In [None]:
fig, (ax1, ax2) = plt.subplots(2, 1)
cores_historico = [f'#{random.randint(0, 0xFFFFFF):06x}' for _ in range(numero_de_execucoes)]
index_historico = 0
for historico in historicos_de_solucoes:
    s = len(historico.fit_pen)
    ax1.plot(np.linspace(0,s-1,s),historico.fit_pen,'k-', color=cores_historico[index_historico])
    ax2.plot(np.linspace(0,s-1,s),historico.pen,'b:', color=cores_historico[index_historico])
    index_historico += 1
    
fig.suptitle('Evolução da qualidade da solução candidata')
ax1.set_ylabel('fitness(x) penalizado')
ax2.set_ylabel('penalidade(x)')
ax2.set_xlabel('Número de avaliações')
plt.subplots_adjust(left=0.1,
                        bottom=0.1, 
                        right=0.9, 
                        top=0.9, 
                        wspace=0.4, 
                        hspace=0.4)
plt.show()

#### Plot Resultados

In [None]:
def plot_triangulo(x, y, size, color):
    plt.plot(x, y, marker='^', markersize=size, color=color, linestyle='None')

# Função para plotar um círculo
def plot_circulo(x, y, size, color):
    plt.plot(x, y, marker='o', markersize=size, color=color, linestyle='None')
    

for x in melhores_solucoes:
    # Gerar uma lista de cores aleatórias para os pontos de acesso
    cores_pontos_acesso = [f'#{random.randint(0, 0xFFFFFF):06x}' for _ in range(len(x.sol_PAs_ativos))]

    cores: Dict[int, str] = {}

    index = 0
    for ponto_de_acesso_id in x.nCL_por_PA:
        cores[ponto_de_acesso_id] = cores_pontos_acesso[index]
        index += 1

    pontos_de_acessos_ativos: Dict[int, float] = {}
        
    for cliente_id, ponto_de_acesso_id in x.sol_PA_id_por_cliente.items():
        if ponto_de_acesso_id != INVALID:
            plot_triangulo(dados.PA_list[ponto_de_acesso_id].x, dados.PA_list[ponto_de_acesso_id].y, 10, color = cores[ponto_de_acesso_id])
            plot_circulo(dados.CL_list[cliente_id].x, dados.CL_list[cliente_id].y, 5, color = cores[ponto_de_acesso_id])

    # Configurar o gráfico
    plt.xlabel('Coordenada X')
    plt.ylabel('Coordenada Y')
    label = f'Número de PAs {x.fitness_PA_min} Clientes Atendidos {x.porcentagem_CL_antendidos}'
    plt.title(f'Pontos de Acesso e Clientes Atendidos \n{label}')
    plt.grid(True)

    # Mostrar o gráfico
    plt.show()
