# Optimization Task

In [3]:
# Questions:
# - Is the penalty cost incurred every day or only when the maintenance is being done?

In [None]:
# TODO:
# - set seed?

In [4]:
# Imports
import pandas as pd
import math
import random

## Global Variables

In [17]:
total_no_engines = 100
teams = ["A", "A", "B", "B"]
# no_teams = 4
# a_teams = 2
# b_teams = 2

engines = list(range(1, total_no_engines + 1))

## Loading Data

In [5]:
file_path = "./data/RUL_consultancy_predictions_A3-2.csv"

# Load the CSV file into a pandas DataFrame
df = pd.read_csv(file_path)

# Display the first few rows of the DataFrame to verify that the data was loaded correctly
print(df.head())

# Split the "RUL;id" column into two separate columns
df[['RUL', 'id']] = df['RUL;id'].str.split(';', expand=True)

# Convert the 'RUL' column to a dictionary with 'id' column as keys
RUL = dict(zip(df['id'].astype(int), df['RUL'].astype(int)))

# print(RUL)


  RUL;id
0  135;1
1  125;2
2   63;3
3  100;4
4  103;5


## Maintenance Teams

In [7]:

maintenance_duration = {}

# Define the values for μAj
for engine in engines:
    if 1 <= engine <= 20:
        maintenance_duration[engine] = {'A': 4, 'B': 5}  # since μBj = μAj + 1
    elif 21 <= engine <= 55:
        maintenance_duration[engine] = {'A': 3, 'B': 4}  # since μBj = μAj + 1
    elif 56 <= engine <= 80:
        maintenance_duration[engine] = {'A': 2, 'B': 3}  # since μBj = μAj + 1
    else:
        maintenance_duration[engine] = {'A': 8, 'B': 9}  # since μBj = μAj + 1

# Update the values for μBj based on conditions
for engine in engines:
    if 1 <= engine <= 25:
        maintenance_duration[engine]['B'] = maintenance_duration[engine]['A'] + 1
    elif 26 <= engine <= 70:
        maintenance_duration[engine]['B'] = maintenance_duration[engine]['A'] + 2
    else:
        maintenance_duration[engine]['B'] = maintenance_duration[engine]['A'] + 1

# Print the dictionary to see the values
# for engine in engines:
#     print(f"Index engine={engine}: A={maintenance_duration[engine]['A']}, B={maintenance_duration[engine]['B']}")



## Safety due date

In [16]:
t_1 = 1
safety_due_date = {id_val: t_1 + rul - 1 for id_val, rul in RUL.items()}
# print(safety_due_date)

## Penalty costs

In [9]:
penalty_costs = {}

# Define the values for cj
for engine in range(1, 21):
    penalty_costs[engine] = 4

for engine in range(21, 31):
    penalty_costs[engine] = 3

for engine in range(31, 46):
    penalty_costs[engine] = 2

for engine in range(46, 81):
    penalty_costs[engine] = 5

for engine in range(81, 101):
    penalty_costs[engine] = 6

# Print the dictionary to see the values
# for engine, cost in penalty_costs.items():
#     print(f"Index engine={engine}: cost={cost}")


## Help functions

In [10]:
def cost(engine, day):
    """
    Calculates the incurred cost when an engine is not maintained by its safety due date.

    Parameters:
    - engine: number of the engine
    - day: current day

    Returns:
    - integer: incurred cost for the engine on the day
    """
    calculated_cost = penalty_costs[engine] * ((day - safety_due_date[engine]) ** 2)
    return min(calculated_cost, 250)

In [11]:
def update_safety_due_date(planning_horizon, engine):
    """
    Update safety due date after maintenance has been performed on the engine.
    An engine can only "fail" once during the planning period. Hence, 
    we make sure it exceeds the planning horizon.

    Parameters:
    - planning_horizon: number of days for which a maintenance schedule needs to be created
    - engine: number of the engine
    """
    safety_due_date[engine] = planning_horizon + 1

In [12]:
def complete_on_time(team, engine, start_day, planning_horizon):
    """
    Checks if the maintenance can be performed by the team on the engine within the planning period.

    Parameters:
    - team: "A" or "B" indicating the type of team
    - engine: number of the engine
    - start_day: day when the maintenance is started
    - planning_horizon: number of days for which a maintenance schedule needs to be created

    Returns:
    - boolean: whether the maintenance can be performed within the planning horizon by the team on the engine
    """
    return start_day + maintenance_duration[engine][team] - 1 <= planning_horizon    

In [25]:
def engines_requiring_maintenance(planning_horizon):
    """
    Return the engines that require maintenance within the planning period.

    Parameters:
    - planning_horizon: number of days for which a maintenance schedule needs to be created

    Returns:
    - list: engines requiring maintenance
    """
    engines_to_maintain = []
    for engine_id, due_date in safety_due_date.items():
        if due_date <= planning_horizon:
            engines_to_maintain.append(engine_id)
    return engines_to_maintain

In [21]:
def leftover_days(planning_horizon, schedule):
    """
    Calculates how many days each team still has to perform maintenance given the current schedule.

    Parameters:
    - planning_horizon: number of days for which a maintenance schedule needs to be created
    - schedule: nested list of engines maintained by each team

    Returns:
    - nested list: number of days left for each team
    """
    # Initialize a list to store the leftover time for each team
    days_left = [planning_horizon] * len(schedule)
    
    # Iterate through each team in the schedule
    for team_index in range(len(schedule)):
        # Iterate through each engine in the team's schedule
        for engine in schedule[team_index]:
            # Subtract the maintenance duration of the engine from the team's number of days left
            days_left[team_index] -= maintenance_duration[engine][teams[team_index]]
    
    return days_left

In [32]:
def available_teams(days_left, engine):
    """
    Returns a list of team indices that still have enough days left to perform
    maintenance on the engine.

    Parameters:
    - days_left: list containing the number of days each team has left
    - engine: number of the engine that needs to be maintained

    Returns:
    - list: numbers of the teams that are able to perform the maintenance
    """
    # Initialize a list to store indices of teams in which the maintenance still fits
    team_indices = []

    # Iterate through each team
    for team_index in range(len(days_left)):
        # Get the team type (i.e. "A" or "B")
        team_type = teams[team_index]
        
        # Check if the team has enough time left to perform maintenance on the engine
        if days_left[team_index] >= maintenance_duration[engine][team_type]:
            # Add the team index to the list of fitting teams
            team_indices.append(team_index)
    
    return team_indices

In [50]:
# Quintine
def feasible(planning_horizon, schedule):
    """
    Determines if the schedule is feasible.
    
    Parameters:
    - planning_horizon: number of days for which a maintenance schedule needs to be created
    - schedule: nested list of engines maintained by each team

    Returns:
    - bool: whether each team can maintain their assigned engines within the planning horizon
    """
    return all(team_days_left >= 0 for team_days_left in leftover_days(planning_horizon, schedule))


## Genetic Algorithm

In [49]:
# Quintine
def create_random_individuals(no_individuals, planning_horizon):
    """
    Creates a certain number of random individuals.

    Parameters:
    - no_individuals: number of random individuals that are created
    - planning_horizon: number of days for which a maintenance schedule needs to be created

    Returns:
    - list: population containing the random individuals
    - list: engines that required maintenance but could not be maintained
    """
    # Find engines that require maintenance within the planning horizon
    engines_to_maintain = engines_requiring_maintenance(planning_horizon)
    
    # Initialize an empty schedule for each team
    schedule = [[] for _ in range(len(teams))]

    # Initialize a list to store engines that could not be maintained
    unmaintained_engines = []

    while len(engines_to_maintain) > 0:
        # Randomly select an engine
        engine = random.choice(engines_to_maintain)
        
        # Determine how many days each team has left to perform maintenance
        days_left = leftover_days(planning_horizon, schedule)

        # Find teams that still have enough number of days left to maintain this engine
        fitting_teams = available_teams(days_left, engine)

        # If there are fitting teams, randomly select one of them and assign the engine
        if len(fitting_teams) > 0:
            team = random.choice(fitting_teams)
            schedule[team].append(engine)
            engines_to_maintain.remove(engine)
        else:
            # If no fitting teams are found, add the engine to the list of unmaintained engines
            unmaintained_engines.append(engine)

    return schedule, unmaintained_engines

In [None]:
# Charlot

def fitness(maintenance_schedule):
    """
    Calculates the total incurred costs based on the maintenance schedule for each machine.

    Parameters:
    - maintenance_schedule (dict): A dictionary where keys represent engines and values represent lists of maintenance information.
    
    Returns:
    - int: Value representing the total incurred costs for all engine's maintenance schedules.
    """
    total_cost_all_engines = 0

    for engine_maintenance in maintenance_schedule.values():
        for task in engine_maintenance:
            total_cost_all_engines += task['cost']

    return total_cost_all_engines

In [None]:
# Example of use fitness function

maintenance_data_test = {
    'Machine1': [
        {
            'team_type': 'A',
            'start_day': 1,
            'end_day': 10,
            'cost': 1000
        },
        {
            'team_type': 'B',
            'start_day': 2,
            'end_day': 15,
            'cost': 1500
        }
    ],
    'Machine2': [
        {
            'team_type': 'Team C',
            'start_date': '2024-01-15',
            'end_date': '2024-01-20',
            'cost': 1200
        }
    ],
}

fitness_cost = fitness(maintenance_data_test)
print(fitness_cost)

3700


In [None]:
minimum_cost = math.inf
best_maintenance_schedule_index = None
patience = 5
runs_since_best_fitness = 0

def check_termination_criterion(total_cost_all_engines, schedule_index):
    """
    Checks if the optimal fitness value has been observed based on the exploration-exploitation trade-off. If so, the corresponding maintenance schedule index is returned.

    Parameters:
    - total_cost_all_engines (int): Value representing the total incurred costs for a maintenance schedule.
    - schedule_index (int): Index representing the current maintenance schedule being evaluated.
    
    Returns:
    - int or None: The index of the maintenance schedule with the best fitness value observed so far, or None if termination criterion is not met.
    """
    global minimum_cost, best_maintenance_schedule_index, runs_since_best_fitness
    
    if total_cost_all_engines < minimum_cost:
        minimum_cost = total_cost_all_engines
        best_maintenance_schedule_index = schedule_index
        runs_since_best_fitness = 0
    else:
        runs_since_best_fitness += 1
        if runs_since_best_fitness > patience:
            return best_maintenance_schedule_index  # Termination criterion met
        
    return best_maintenance_schedule_index  # Termination criterion not met yet

In [None]:
# Example usage of the check_termination_criterion function
example_costs_maintenance_schedules = [3000, 2000, 7000, 6500, 5000, 2500, 6090, 7654]
best_individual_maintenance_schedule_index = None

for index,  cost in enumerate(example_costs_maintenance_schedules):
    best_individual_maintenance_schedule_index = check_termination_criterion(cost, index)
print(best_individual_maintenance_schedule_index)

1


In [None]:
# Charlot
def select_parents(maintenance_schedules):
    """
    Selects parents using the ranking selection method.

    Parameters:
    - maintenance_schedules (dict): Dictionary where keys represent individuals and values represent maintenance schedules.

    Returns:
    - dict: Selected maintenance schedule.
    """
    # Calculate fitness for each maintenance schedule and add to the maintenance schedule
    for maintenance_schedule in maintenance_schedules.values():
        maintenance_schedule['fitness'] = fitness(maintenance_schedule)

    # Sort the maintenance schedules based on their fitness values in ascending order. -> low fitness vallue is best
    ranked_maintenance_schedules = sorted(maintenance_schedules.items(), key=lambda individual_schedule_pair: individual_schedule_pair[1]['fitness'])

    # Calculate the total sum of ranks, used to normalize the probabilities for each maintenance schedule.
    total_rank_sum = sum(1 / (rank + 1) for rank in range(len(ranked_maintenance_schedules)))

    # Calculate probabilities for each individual based on their rank and normalize by total_rank_sum. Here the index is the rank
    # probabilities is therefore a dict with the index of the maintenance_schedules and there probability of being selected. 
    # {2: 0.6666666666666666, 1: 0.3333333333333333} for the example below
    probabilities = {maintenance_schedule_id: 1 / (rank + 1) / total_rank_sum for rank, (maintenance_schedule_id, _) in enumerate(ranked_maintenance_schedules)}

    # Select a random number between 0 and 1
    random_number = random.random()

    # Iterate through maintenance schedules and select one based on probabilities
    cumulative_probability = 0
    #  In the ranking selection method, the probability of selecting a particular maintenance schedule is not directly used for comparison with the random number. 
    # Instead, we use the cumulative probabilities to create ranges that the random number falls into.
    for maintenance_schedule_id, _ in ranked_maintenance_schedules:
        cumulative_probability += probabilities[maintenance_schedule_id]
        if random_number <= cumulative_probability:
            return maintenance_schedules[maintenance_schedule_id]

In [None]:
# Example usage
maintenance_schedules = {
    1: {
        'Machine1': [
            {
                'team_type': 'A',
                'start_day': 1,
                'end_day': 10,
                'cost': 1000
            },
            {
                'team_type': 'B',
                'start_day': 2,
                'end_day': 15,
                'cost': 1500
            }
        ],
        'Machine2': [
            {
                'team_type': 'Team C',
                'start_date': '2024-01-15',
                'end_date': '2024-01-20',
                'cost': 1200
            }
        ]
    },
    2: {
        'Machine1': [
            {
                'team_type': 'A',
                'start_day': 1,
                'end_day': 10,
                'cost': 300
            }
        ],
        'Machine2': [
            {
                'team_type': 'Team C',
                'start_date': '2024-01-15',
                'end_date': '2024-01-20',
                'cost': 120
            }
        ]
    },
}

selected_schedule = select_parents(maintenance_schedules)
print(selected_schedule)

{'Machine1': [{'team_type': 'A', 'start_day': 1, 'end_day': 10, 'cost': 300}], 'Machine2': [{'team_type': 'Team C', 'start_date': '2024-01-15', 'end_date': '2024-01-20', 'cost': 120}], 'fitness': 420}


In [None]:
def crossover():
    return

In [None]:
def mutation():
    return

In [None]:
def new_offspring():
    return

In [None]:
planning_horizon = 30
maintenance_schedule = dict()

# Structure
# maintenance_data = {
#     'Machine1': [
#         {
#             'team_type': 'A',
#             'start_day': 1,
#             'end_day': 10,
#             'cost': 1000
#         },
#         {
#             'team_type': 'B',
#             'start_day': 2,
#             'end_day': 15,
#             'cost': 1500
#         }
#     ],
#     'Machine2': [
#         {
#             'team_type': 'Team C',
#             'start_date': '2024-01-15',
#             'end_date': '2024-01-20',
#             'penalty_costs': 1200
#         }
#     ],
#     # Add more machines as needed
# }

# New structure
# schedule = [[1, 4, 5, 10], [7] [2, 8, 9], [3, 6]]
# each array corresponds to the engines each team needs to maintain
# the first two arrays are the two A teams, the last two arrays are the two B teams
