In [1]:
import pandas as pd
import random
import functions

#### Functions

In [None]:
def quality_of_service(vehicles, antennas, alfa, beta, chromossomo):
    """
    Calculate the quality of service (QoS) and assess RSU deployment quality.

    Parameters:
    - vehicles (DataFrame): A DataFrame containing vehicle-related data.
    - antennas (DataFrame): A DataFrame containing antenna-related data.
    - alfa (float): The threshold value for the QoS calculation.
    - beta (float): The minimum required quality of service (QoS) threshold.
    - chromossomo (list): A binary list representing the deployment status of RSUs.

    Returns:
    - QoS (float): The calculated quality of service based on the provided data.
      If QoS is greater than or equal to 'beta,' it returns the total number of
      RSUs turned on. Otherwise, it returns a value related to a suboptimal deployment.

    This function assesses the quality of RSU deployment using a QoS calculation based on
    the provided data, 'alfa' threshold, and RSU deployment status represented by 'chromossomo.'
    It returns the calculated QoS, and if the QoS is greater than or equal to 'beta,' it returns
    the sum of RSUs turned on. Otherwise, it returns a value related to a suboptimal deployment.
    """
    
    global total_RSU

    # Set the 'ON' column in 'antennas' based on 'chromossomo'
    antennas["ON"] = chromossomo

    # Merge vehicle and antenna data
    vehicles = vehicles.merge(antennas, how="left")
    vehicles["Connection"] = vehicles["Permanence (s)"] * vehicles["ON"]

    # Group and aggregate data by 'ID'
    df_ = vehicles[["ID", "Permanence (s)", "Connection"]].groupby("ID").sum().reset_index()
    df_["Delta"] = df_["Connection"] / df_["Permanence (s)"]

    # Calculate the QoS based on the 'alfa' threshold
    m = df_["Delta"] >= alfa
    x = m.sum() / df_.shape[0]

    # Check if the QoS meets the 'beta' requirement
    if x >= beta:
        return sum(chromossomo)
    else:
        return total_RSU * 5


In [None]:
def selection_parents(roulette, population_size):
    """
    Select a parent index based on a roulette wheel selection method.

    Parameters:
    - roulette (list): A list representing the segments of the roulette wheel.
    - population_size (int): The size of the population to select from.

    Returns:
    - parent (int): The index of the selected parent in the population.

    This function performs parent selection using a roulette wheel selection method.
    It takes a list 'roulette' containing segments of the roulette wheel and 'population_size'
    to determine the size of the population. A random probability is generated, and the function
    returns the index of the selected parent based on the probability and roulette segments.
    """
    
    prob = random.random()
    for parent in range(population_size):
        if prob <= roulette[parent]:
            return parent


In [None]:
def crossover_function(p1, p2, population):
    """
    Perform a crossover operation between two parent chromosomes.

    Parameters:
    - p1 (int): The index of the first parent in the population.
    - p2 (int): The index of the second parent in the population.
    - population (dict): A dictionary representing the population of individuals.

    Returns:
    - c1 (list): The first child chromosome resulting from the crossover.
    - c2 (list): The second child chromosome resulting from the crossover.

    This function performs a crossover operation between two parent chromosomes
    to generate two child chromosomes. It takes the indices of the parents 'p1' and 'p2'
    in the population dictionary 'population.' The crossover is performed by dividing
    the chromosomes at the halfway point, and the function returns the two child chromosomes.
    """

    global total_RSU

    # Determine the crossover point
    cut = total_RSU // 2

    # Create child chromosomes by combining parts of the parents
    c1 = population[p1]["chromossomo"][:cut] + population[p2]["chromossomo"][cut:]
    c2 = population[p2]["chromossomo"][:cut] + population[p1]["chromossomo"][cut:]

    return c1, c2


In [None]:
def mutation_functions(mutation_rate, new_chromossomo):
    """
    Apply mutation to a chromosome with a given mutation rate.

    Parameters:
    - mutation_rate (float): The probability of mutation for each gene in the chromosome.
    - new_chromossomo (list): The chromosome to which the mutation is applied.

    Returns:
    - mutated_chromossomo (list): The chromosome after applying the mutation.

    This function performs a mutation operation on a chromosome based on the provided
    mutation rate. Each gene in the chromosome has a probability 'mutation_rate' of being
    mutated. If a gene is selected for mutation, it is toggled (0 to 1 or 1 to 0). The function
    returns the chromosome after applying the mutation.
    """
    
    global total_RSU

    # Check if each gene should undergo mutation based on the mutation rate
    for i in range(total_RSU):
        if random.random() <= mutation_rate:
            # Toggle the gene (0 to 1 or 1 to 0)
            if new_chromossomo[i] == 0:
                new_chromossomo[i] = 1
            else:
                new_chromossomo[i] = 0
    
    return new_chromossomo


#### Parameters

<!--

# Read and process solutions from 'Solution_Permanence.parquet' into a list
#perfomance_sol = pd.read_parquet('Solution_Permanence.parquet')
#perfomance_sol = list(perfomance_sol.sort_values(['X', 'Y'])['ON'])

# Read and process solutions from 'Solution_Mean.parquet' into a list
#mean_sol = pd.read_parquet('Solution_Mean.parquet')
#mean_sol = list(mean_sol.sort_values(['X', 'Y'])['ON'])

# Read and process solutions from 'Solution_Vehicles.parquet' into a list
#Vehicles_sol = pd.read_parquet('Solution_Vehicles.parquet')
#Vehicles_sol = list(Vehicles_sol.sort_values(['X', 'Y'])['ON'])

# Create a list 'solutions' containing the processed solutions
#solutions = [perfomance_sol, mean_sol, Vehicles_sol]

 -->

In [None]:
# Read data from the 'vehicles.parquet' file into the 'df' DataFrame
df = pd.read_parquet("files\\vehicles.parquet")

# Read data from the 'antennas.parquet' file into the 'df_RSU' DataFrame
df_RSU = pd.read_parquet("files\\antennas.parquet")

# Get the total number of rows (shape[0]) in the 'df_RSU' DataFrame
total_RSU = df_RSU.shape[0]

# Set values for parameters 'alfa' and 'beta'
alfa = 0.5
beta = 0.3

# Create a dictionary 'best_chromossomo' with initial values
best_chromossomo = {
    "chromossomo": [],
    "Fitness": total_RSU * 10
}

# Set the size of the population for the genetic algorithm
population_size = 40

# Set the total number of generations for the genetic algorithm
total_generation = 600

# Set the crossover rate for the genetic algorithm
crossover_rate = 0.9

# Set the mutation rate for the genetic algorithm
mutation_rate = 0.18

# Set the initial mutation chance for the genetic algorithm
mutation_chance = 0

#### Genetic Algorithm

<!--

if p < 3:
    chromossomo = solutions[p]
else:

-->

In [None]:
# Create an empty dictionary 'population' to store individuals in the population
population = {}

# Iterate through the population size to create random chromosomes and evaluate fitness
for p in range(population_size):
    
    # Generate a random chromosome with binary values (0 or 1) for each RSU
    chromossomo = [random.choice([0, 1]) for _ in range(total_RSU)]
    
    # Calculate the fitness of the individual (chromosome) using the 'quality_of_service' function
    fitness = quality_of_service(df.copy(), df_RSU.copy(), alfa, beta, chromossomo)
    
    # Store the chromosome and its fitness in the 'population' dictionary
    population[p] = {
        "chromossomo": chromossomo,
        "Fitness": fitness
    }
    
    # Check if the fitness of the current individual is better than the best known fitness
    if fitness < best_chromossomo["Fitness"]:
        # Update the best-known solution with a copy of the current individual
        best_chromossomo = population[p].copy()

# Print the fitness of the best solution found in the population
print(f"Best Solution : {best_chromossomo["Fitness"]}")


In [None]:
# Initialize a count variable to track iterations
count = 0

# Loop through a total number of generations
for ger in range(total_generation):

    # Adjust mutation rate and mutation chance periodically
    if (ger % 100 == 0) or (count % 20 == 0):
        mutation_rate += 0.02
        mutation_chance += 15

    # Create a Roulette wheel based on fitness values
    fitness = [1 / population[p]["Fitness"] for p in range(population_size)]
    roulette = [fitness[p] / sum(fitness) for p in range(population_size)]
    roulette = [sum(roulette[:p+1]) for p in range(0, population_size)]

    # Generate a new population
    new_population = []
    for i in range(population_size // 2):

        # Select parents using the Roulette wheel
        p1 = selection_parents(roulette, population_size)
        p2 = selection_parents(roulette, population_size)

        # Perform crossover with a probability of 'crossover_rate'
        if random.random() < crossover_rate:
            c1, c2 = crossover_function(p1, p2, population)
            new_population.append(c1)
            new_population.append(c2)
        else:
            new_population.append(population[p1]["chromossomo"])
            new_population.append(population[p2]["chromossomo"])

    # Apply mutations to the new generation
    for p in range(population_size):
        for m in range(mutation_chance):
            new_population[p] = mutation_functions(mutation_rate, new_population[p].copy())

        # Update the current individual's chromosome and fitness
        population[p]["chromossomo"] = new_population[p].copy()
        population[p]["Fitness"] = quality_of_service(df.copy(), df_RSU.copy(), alfa, beta, population[p]["chromossomo"])

        # Update the best-known solution if a better one is found
        if population[p]["Fitness"] < best_chromossomo["Fitness"]:
            best_chromossomo = population[p].copy()
            count = 0

    # Apply elitism by replacing the worst solution with the best solution
    fitness = [population[p]["Fitness"] for p in range(population_size)]
    worst_solution_index = fitness.index(max(fitness))
    population[worst_solution_index] = best_chromossomo.copy()

    # Print generation information
    print(f"Generation: {ger} : {best_chromossomo["Fitness"]}")

    # Save the best solution to a pickle file
    functions.save_pickle(best_chromossomo, "Solution_AG.pkl")
    
    # Increment the count
    count += 1
