In [None]:
import random
from SUMOSimulation import SUMO

# Ubicación de los archivos de simulación de la red vial:
infoSUMO = {
    'projectFolder': r"SUMO_Simulation\RedVialZMG",
    'netFile': "\osm.net.xml", # Nombre del archivo que contiene la red
    'sumoCfgFile': "\osm.sumocfg" # Nombre del archivo principal de ejecución de SUMO
    }

#Dicionario de clusters de intersecciones: {id_custer: [genotipo del tiempo en verde de cada semáforo], [archivos de detectores de vehículos]}
ampelDict = {"cluster_1395059763_1658290098": [[], ["\Torres.xml", "\Belizario.xml", "\Torres2Belizario.xml"]]} 

# Parámetros de inicio del algoritmo genético:
SIMULATION_TIME = 300 # Tiempo de ejecución de la simulación en segundos
POPULATION_SIZE = 100
GENERATIONS = 50
CROSSOVER_ODDS = .8
MUTATION_ODDS = .2
MIN_TIME = 20 # Tiempo mínimo en verde
MAX_TIME = 120 # Tiempo máximo en verde
RESOLUTION = 8 # bits: resolución del valor de tiempo del semáforo

#Número de semáforos:
nAmpels = len(ampelDict["cluster_1395059763_1658290098"][1]) # Cada archivo corresponde a un semáforo o cruce

#Número máximo posible que puede tomar el gen en base a la resolución  de bits:
maxGenVal = 2**RESOLUTION # 2^n


def generatePopulation():
    """Se genera la población inicial de manera aleatoria"""
    return [''.join(f"{random.randint(0, maxGenVal):0{RESOLUTION}b}" for _ in range(nAmpels)) for _ in range(POPULATION_SIZE)]

def calculateFitness(chromosome, vehicleRates):
    """Se calcula el tiempo total y penaliza los semáforos con flujo bajo y tiempos en VERDE altos"""
    ampelTimes = gen2fen(chromosome)
    totalFlow = 0 # Suma de tasas de flujos
    penalization = 0 # Penalización de tiempo para el cromosoma

    for i in range(len(ampelTimes)-1):
        time = ampelTimes[i] # Tiempo del alelo actual (semáforo)
        cVehicleRate = vehicleRates[i] # Tasa de vehiculos correspondiente al alelo actual (semáforo)
        flow = time * cVehicleRate # Factor de flujo = tiempo en verde * la tasa de vehiculos del semáforo actual
        totalFlow += flow

        # Se penalizan largos tiempos en verde para tasas bajas de vehículos:
        if cVehicleRate < 0.2:
            penalization += (time - MIN_TIME) ** 0.5

    # Fitness bruto: flujo total menos penalización
    rawFitness = totalFlow - penalization

    # Normalización entre 0 - 1:
    maxPossibleFlow = MAX_TIME * sum(vehicleRates)  # Flujo máximo posible
    normalizedFitness = max(0, rawFitness) / maxPossibleFlow  # Normalizar y evitar valores negativos
    
    return normalizedFitness

def selection(population, fitness):
    """Se selecciona de manera elitista el mejor individuo de la generación"""
    fittedPopulation = list(zip(population, fitness)) # Se crean pares (población, fitness)
    bestFit = max(fittedPopulation, key=lambda x: x[1]) # Se selecciona el individuo con mejor fitness
    bestChromosome = bestFit[0] #Se guarda el mejor individuo

    return bestChromosome

def crossover(parent1, parent2):
    """Se ejecuta la cruza con el método de 'Cruza de un Punto' entre los padres"""
    if random.random() < CROSSOVER_ODDS:
        crossoverPoint = random.randint(1, len(parent1)-1) #Se selecciona el punto de cruza a partir del tamaño estándar del individuo
        child1 = parent1[:crossoverPoint] + parent2[crossoverPoint:] # De la cruza se genera el hijo 1
        child2 = parent2[:crossoverPoint] + parent1[crossoverPoint:] # De la cruza se genera el hijo 2

    else:
        child1 = parent1
        child2 = parent2

    return child1, child2

def mutation(chromosome):
    """Se ejecuta la mutación mediante 'Bit Fip' de un alelo"""
    if random.random() < MUTATION_ODDS:
        pos = random.randint(0, len(chromosome)-1) # Posición del alelo a mutar (aleatoria)
        mutatedChromosome = chromosome[:pos] + ('1' if chromosome[pos] == '0' else '0') + chromosome[pos+1:] # 1 -> 0 | 0 -> 1
        return mutatedChromosome
    else:
        return chromosome

def gen2fen(chromosome):
    """Transforma una representación de genotipo a su fenotipo correspondiente"""
    def mapGenotype(gen):
        """ Realiza el mapeo del valor binario al decimal entre los rangos MIN_TIME y MAX_TIME"""
        gen_i = int(gen,2) # Valor del gen en decimal
        return MIN_TIME + ((gen_i/maxGenVal) * (MAX_TIME-MIN_TIME))

    allGens = [chromosome[i:i+RESOLUTION] for i in range(0, len(chromosome), RESOLUTION)]
    ampelTimes = [mapGenotype(gen) for gen in allGens]

    return ampelTimes


In [None]:
def ampelOptimizationGA(showSimulation=False):
    """Obtiene los mejores tiempos en un sistema de semáforos para optimizar los tiempos de espera en intersecciones"""
    population = generatePopulation()
    vehicleRates = [random.random() for _ in range(nAmpels)]
    
    for generation in range(GENERATIONS):
        next_generation = []
        #Obtener la aptitud de cada individuo:
        cFinesess = [calculateFitness(chromosome, vehicleRates) for chromosome in population]

        while len(next_generation) < POPULATION_SIZE:
            #Se aplica selección:
            parent1 = selection(population, cFinesess)
            parent2 = selection(population, cFinesess)

            #Se aplica la cruza:
            child1, child2 = crossover(parent1, parent2)

            #Se aplica la mutación:
            child1 = mutation(child1)
            child2 = mutation(child2)

            next_generation.append(child1) # Se pasa el primer hijo a la siguiente generación

            if len(next_generation) < POPULATION_SIZE: # Sí no se ha revasado el tamaño de población
                next_generation.append(child2) # Se agrega también el segundo hijo

        # Se reemplaza la población con la nueva generación:
        population = next_generation

        #Mejores parámetros de la generación:
        bestFitness = max(cFinesess)
        bestChromosome = population[cFinesess.index(bestFitness)]
        bestAmpelTimes = gen2fen(bestChromosome)
        print(f"Generación No. {generation+1}: Mejor aptitud = {bestFitness} | Mejor genotipo = {bestChromosome} | Mejor Fenotipo: {bestAmpelTimes}")

        #Simular con el mejor cromosoma de la generación:
        ampelDict['cluster_1395059763_1658290098'][0] = bestAmpelTimes
        projectFolder = infoSUMO['projectFolder']
        netFile = infoSUMO['netFile']
        sumoCfgFile = infoSUMO['sumoCfgFile']

        #print(f"Comenzando simulación con el mejor individuo: ")
        simulation = SUMO(ampelDict, projectFolder, netFile, sumoCfgFile, SIMULATION_TIME, showSimulation)
        vehicleRates = simulation.simulateConditions(200)['cluster_1395059763_1658290098']
        #print(f"Tasa de vehículos: {vehicleRates}")

        # Se reemplaza la población con la nueva generación:

    return bestChromosome

def simulateBest(bestChromosome):
    bestAmpelTimes = gen2fen(bestChromosome)
    """Simular con las mejores condiciones del algoritmo en la GUI de SUMO"""
    projectFolder = infoSUMO['projectFolder']
    netFile = infoSUMO['netFile']
    sumoCfgFile = infoSUMO['sumoCfgFile']

    ampelDict['cluster_1395059763_1658290098'][0] = bestAmpelTimes
    #print("\n*************** Iniciando simulación con la mejor solución del algoritmo genético ***************")
    simulation = SUMO(ampelDict, projectFolder, netFile, sumoCfgFile, SIMULATION_TIME, showSimulation=True)
    simulation.simulateConditions()

In [None]:
bestChromosome = ampelOptimizationGA() # Pasar True como argumento para ver cada simulación (cada generación)
print(f"Mejor solución {bestChromosome}")
simulateBest(bestChromosome)