In [1]:
%load_ext autoreload
%autoreload 2

import numpy as np

from src.entities import GSA

from typing import Union

In [2]:
# Define corridor

corridor = {"MAD": {
                "CIU": {
                    "COR": {
                        "SEV": {
                            "CAD": {}
                        },
                        "PGE": {
                            "ANT": {
                                "GRA": {},
                                "MAL": {}
                                    }
                                }
                            }
                        }
                    }
            }

In [3]:
# Get lines from corridor

def get_lines(corridor: dict, path: Union[list, None]=None) -> list:
    """
    Get all the lines in the corridor
    
    Args:
        corridor (dict): dictionary with the corridor structure
        path (list, optional): list of nodes
    
    Returns:
        list of lines
    """
    if path is None:
        path = []

    lines = []
    for node, child in corridor.items():
        new_path = path + [node]
        if not child:  # If the node has no children, it is a leaf
            lines.append(new_path)
        else:
            lines.extend(get_lines(child, new_path))  # If the node has children, we call the function recursively

    return lines

get_lines(corridor)

[['MAD', 'CIU', 'COR', 'SEV', 'CAD'],
 ['MAD', 'CIU', 'COR', 'PGE', 'ANT', 'GRA'],
 ['MAD', 'CIU', 'COR', 'PGE', 'ANT', 'MAL']]

In [4]:
def sample_line(lines: list) -> list:
    """
    Sample a random line from the list of lines
    
    Args:
        lines (list): list of lines
    
    Returns:
        list: random line
    """
    return lines[np.random.randint(len(lines))] 

def sample_route(line: list) -> list:
    """
    Sample a random route from line
    
    Args:
        line (list): list of stations
        
    Returns:
        list: random route
    """
    return line[np.random.randint(0, len(line)-1):]

lines = get_lines(corridor)

line = sample_line(lines)
print(f"Sampled line: {line}")

# Sample a random route from line (at least two stations)
route = sample_route(line)
print(f"Sampled route: {route}")

Sampled line: ['MAD', 'CIU', 'COR', 'SEV', 'CAD']
Sampled route: ['SEV', 'CAD']


In [5]:
def get_timetable(route: list) -> dict:
    """
    Generate random timetable for route r
    
    Args:
        route (list): list of stations
    
    Returns:
        dict: timetable
    """
    timetable = {}
    AT = np.random.randint(0, 24*60)
    DT = AT
    for i, sta in enumerate(route):
        if i == 0 or i == len(route)-1:
            timetable[sta] = (AT, AT)
        else:
            timetable[sta] = (AT, DT)
            
        AT += np.random.randint(30, 120)
        DT = AT + np.random.randint(2, 8)
        
    return timetable

get_timetable(route)

{'SEV': (401, 401), 'CAD': (504, 504)}

In [6]:
# Generate random requested timetable in corridor for a day t

def get_schedule_request(n_services: int) -> dict:
    """
    Generate random timetable
    
    Args:
        n_services (int): number of services
    
    Returns:
        dict: timetable
    """
    return {i: get_timetable(sample_route(sample_line(lines))) for i in range(1, n_services+1)}

# Generate random schedule
schedule = get_schedule_request(10)
schedule

{1: {'COR': (521, 521), 'SEV': (569, 576), 'CAD': (655, 655)},
 2: {'COR': (637, 637), 'SEV': (667, 670), 'CAD': (725, 725)},
 3: {'CIU': (75, 75),
  'COR': (136, 138),
  'PGE': (246, 250),
  'ANT': (307, 314),
  'GRA': (390, 390)},
 4: {'MAD': (1373, 1373),
  'CIU': (1483, 1489),
  'COR': (1526, 1530),
  'PGE': (1631, 1634),
  'ANT': (1663, 1666),
  'MAL': (1765, 1765)},
 5: {'CIU': (599, 599),
  'COR': (677, 683),
  'PGE': (793, 795),
  'ANT': (890, 892),
  'GRA': (1006, 1006)},
 6: {'SEV': (86, 86), 'CAD': (192, 192)},
 7: {'CIU': (673, 673),
  'COR': (755, 757),
  'SEV': (813, 819),
  'CAD': (852, 852)},
 8: {'CIU': (552, 552),
  'COR': (593, 599),
  'PGE': (677, 681),
  'ANT': (774, 778),
  'MAL': (855, 855)},
 9: {'CIU': (1032, 1032),
  'COR': (1071, 1075),
  'SEV': (1188, 1190),
  'CAD': (1288, 1288)},
 10: {'CIU': (876, 876),
  'COR': (936, 943),
  'PGE': (1045, 1050),
  'ANT': (1153, 1155),
  'GRA': (1213, 1213)}}

In [7]:
schedule = {1: {'MAD': (0, 0), 'BAR': (148, 152), 'FIG': (180, 180)},
            2: {'MAD': (8, 8), 'ZAR': (28, 30), 'BAR': (165, 167), 'FIG': (210, 210)},
            3: {'MAD': (30, 30), 'BAR': (180, 182), 'FIG': (225, 225)}}

In [59]:
from copy import deepcopy
from typing import List, Mapping, Union

class ScheduledMaximization:
    """
    Class for the scheduled trains maximization problem
    """
    def __init__(self, requested_schedule: Mapping[int, Mapping[str, Union[int, int]]]):
        """
        Constructor
        
        Args:
            requested_schedule (dict): requested schedule
        """
        self.requested_schedule = requested_schedule 
        self.updated_schedule = deepcopy(requested_schedule)  
        self.real_boundaries = self.get_real_boundaries()
        self.discrete_boundaries = np.array([(0, 1) for _ in range(len(requested_schedule))])
        self.boundaries = {'real': self.real_boundaries, 'discrete': self.discrete_boundaries}
        
    def get_num_trains(self, solution: Mapping[str, List[Union[float, int]]]) -> int:
        """
        Get the number of scheduled trains
        
        Args:
            solution (dict): solution
        
        Returns:
            int: number of scheduled trains
        """
        scheduled_trains = np.sum(solution["discrete"])
        return scheduled_trains, 0
    
    def update_schedule(self, solution):
        """
        Update schedule with the solution
        
        Args:
            solution (list): solution
        """
        s_idx = 0
        for service in self.updated_schedule:
            for i, stop in enumerate(self.updated_schedule[service]):
                if i == 0 or i == len(self.updated_schedule[service]) - 1:
                    self.updated_schedule[service][stop] = (solution[s_idx], solution[s_idx])
                    s_idx += 1
                else:
                    self.updated_schedule[service][stop] = (solution[s_idx], solution[s_idx+1])
                    s_idx += 2
    
    def is_feasible(self, solution, security_gap=10) -> bool:
        """
        Check if the solution is feasible
        
        Args:
            solution (dict): solution
            security_gap (int, optional): security gap
        
        Returns:
            bool: True if the solution is feasible, False otherwise
        """
        S_i = np.array(solution["discrete"])
        times = solution["real"] if solution["real"].size else self.get_real_vars()
        self.update_schedule(times)
        security_array = []

        # Get conflicts between services
        for service in self.updated_schedule:
            service_sec_arr = []  # Build security matrix for service (columns are stops, rows are other services)
            for service_k in self.updated_schedule:
              service_sec_row = [] 
              for stop in self.updated_schedule[service]:
                if service_k == service or stop not in self.updated_schedule[service_k]:
                  service_sec_row.append(0)
                  continue

                if abs(self.updated_schedule[service][stop][1] - self.updated_schedule[service_k][stop][1]) < security_gap:
                  service_sec_row.append(1)
                else:
                  service_sec_row.append(0)
              service_sec_arr.append(service_sec_row)

            security_array.append(np.array(service_sec_arr))

        # Get array of conflicts. If there is a conflict, the dot product will be different from zero
        if not S_i.dot(np.array([np.sum(S_i.dot(ssa)) for ssa in security_array])):
            return True
        return False
    
    def get_real_vars(self):
        """
        Get real variables
        
        Returns:
            list: real variables with requested schedule times.
        """
        real_vars = []
        for service in self.requested_schedule:
            for i, stop in enumerate(self.requested_schedule[service]):
                if i == 0 or i == len(self.requested_schedule[service]) - 1:
                    real_vars.append(self.requested_schedule[service][stop][0])
                else: 
                    real_vars.append(self.requested_schedule[service][stop][0])
                    real_vars.append(self.requested_schedule[service][stop][1])

        return real_vars
    
    def get_real_boundaries(self):
        """
        Get real boundaries
        
        Returns:
            list: real boundaries
        """
        real_vars = self.get_real_vars()
        real_boundaries = []
        for rv in real_vars:
            real_boundaries.append((rv-10, rv+10))

        return np.array(real_boundaries)

In [None]:
schedule = get_schedule_request(20)

In [None]:
sm = ScheduledMaximization(schedule)

In [74]:
gsa_algo = GSA(objective_function=sm.get_num_trains,
               is_feasible=sm.is_feasible,
               r_dim=len(sm.real_boundaries),
               d_dim=len(sm.requested_schedule),
               boundaries=sm.boundaries)

In [75]:
gsa_algo.set_seed(seed=28)

training_history = gsa_algo.optimize(population_size=20,
                                     iters=300,
                                     chaotic_constant=False,
                                     repair_solution=True)

Initializing positions of the individuals in the population...
Positions of the individuals in the population successfully initialized!!
GSA is optimizing  "get_num_trains"
####################################################################################################
REPAIRING SOLUTION... 
####################################################################################################
####################################################################################################
SOLUTION SUCCESSFULLY REPAIRED!!
####################################################################################################
####################################################################################################
REPAIRING SOLUTION... 
####################################################################################################
####################################################################################################
SOLUTION SUCCESSFULLY REPAIRED!!
########

In [76]:
gsa_algo = GSA(objective_function=sm.get_num_trains,
               is_feasible=sm.is_feasible,
               r_dim=0,
               d_dim=len(sm.requested_schedule),
               boundaries=sm.boundaries)

In [77]:
gsa_algo.set_seed(seed=28)

training_history = gsa_algo.optimize(population_size=20,
                                     iters=300,
                                     chaotic_constant=False,
                                     repair_solution=True)

Initializing positions of the individuals in the population...
Positions of the individuals in the population successfully initialized!!
GSA is optimizing  "get_num_trains"
####################################################################################################
REPAIRING SOLUTION... 
####################################################################################################
####################################################################################################
SOLUTION SUCCESSFULLY REPAIRED!!
####################################################################################################
####################################################################################################
REPAIRING SOLUTION... 
####################################################################################################
####################################################################################################
SOLUTION SUCCESSFULLY REPAIRED!!
########