<h1>CODE</h1>

In [2]:
def coord2station(coord, map):
    """
        From coordinates, it searches the closest station.
        Format of the parameter is:
        Args:
            coord (list):  Two REAL values, which refer to the coordinates of a point in the city.
            map (object of Map class): All the map information
        Returns:
            possible_origins (list): List of the Indexes of stations, which corresponds to the closest station
    """
    dist_min = INF
    llista = []
    for i in map.stations:
        station = [map.stations[i]['x'],map.stations[i]['y']]
        dist = euclidean_dist(coord,station)
        if(dist < dist_min):
            llista = [i]
            dist_min = dist
        elif(dist_min == dist):
            llista.append(i)
    return llista

In [3]:
def remove_cycles(path_list):
    """
     It removes from path_list the set of paths that include some cycles in their path.
     Format of the parameter is:
        Args:
            path_list (LIST of Path Class): Expanded paths
        Returns:
            path_list (list): Expanded paths without cycles.
    """
    no_cycles=[]
    for path in path_list:
        not_repeated = []
        not_repeated_name = []
        for station in path.route:
            if station not in not_repeated:
                not_repeated.append(station)
        if len(not_repeated) == len(path.route):
            no_cycles.append(path)
    return no_cycles


In [4]:
def expand(path, map):
    """
     It expands a SINGLE station and returns the list of class Path.
     Format of the parameter is:
        Args:
            path (object of Path class): Specific path to be expanded
            map (object of Map class):: All the information needed to expand the node
        Returns:
            path_list (list): List of paths that are connected to the given path.
    """
    expanded_paths=[]
    for i in map.connections[path.last]:
        path_copy = copy.deepcopy(path)
        path_copy.add_route(i)
        expanded_paths.append(path_copy)
    return expanded_paths

In [5]:
def insert_depth_first_search(expand_paths, list_of_path):
    """
     expand_paths is inserted to the list_of_path according to DEPTH FIRST SEARCH algorithm
     Format of the parameter is:
        Args:
            expand_paths (LIST of Path Class): Expanded paths
            list_of_path (LIST of Path Class): The paths to be visited
        Returns:
            list_of_path (LIST of Path Class): List of Paths where Expanded Path is inserted
    """
    for i in list_of_path:
        expand_paths.append(i)
    return expand_paths

In [6]:
def depth_first_search(origin_id, destination_id, map):
    """
     Depth First Search algorithm
     Format of the parameter is:
        Args:
            origin_id (int): Starting station id
            destination_id (int): Final station id
            map (object of Map class): All the map information
        Returns:
            list_of_path[0] (Path Class): the route that goes from origin_id to destination_id
    """
    path_origin = Path(origin_id)
    llista=[path_origin]
    while(llista[0].last != destination_id and len(llista) != 0):
        to_expand = llista[0]
        llista.pop(0)
        expanded = expand(to_expand, map)
        no_cycles = remove_cycles(expanded)
        llista = insert_depth_first_search(no_cycles, llista)
    if(len(llista) != 0):
        return llista[0]
    else:
        return "No existeix Solucio"

In [7]:
def insert_breadth_first_search(expand_paths, list_of_path):
    """
        expand_paths is inserted to the list_of_path according to BREADTH FIRST SEARCH algorithm
        Format of the parameter is:
           Args:
               expand_paths (LIST of Path Class): Expanded paths
               list_of_path (LIST of Path Class): The paths to be visited
           Returns:
               list_of_path (LIST of Path Class): List of Paths where Expanded Path is inserted
    """
    for i in expand_paths:
        list_of_path.append(i)
    return list_of_path

In [8]:
def breadth_first_search(origin_id, destination_id, map):
    """
     Breadth First Search algorithm
     Format of the parameter is:
        Args:
            origin_id (int): Starting station id
            destination_id (int): Final station id
            map (object of Map class): All the map information
        Returns:
            list_of_path[0] (Path Class): The route that goes from origin_id to destination_id
    """
    path_origin = Path(origin_id)
    list=[path_origin]
    while(list[0].last != destination_id and len(list) != 0):
        to_expand = list[0]
        list.pop(0)
        expanded = expand(to_expand, map)
        no_cycles = remove_cycles(expanded)
        list = insert_breadth_first_search(no_cycles, list)
    if(len(list) != 0):
        return list[0]
    else:
        return "No existeix Solucio"

In [9]:
def calculate_cost(expand_paths, map, type_preference=0):
    """
         Calculate the cost according to type preference
         Format of the parameter is:
            Args:
                expand_paths (LIST of Paths Class): Expanded paths
                map (object of Map class): All the map information
                type_preference: INTEGER Value to indicate the preference selected:
                                0 - Adjacency
                                1 - minimum Time
                                2 - minimum Distance
                                3 - minimum Transfers
            Returns:
                expand_paths (LIST of Paths): Expanded path with updated cost
    """
    if type_preference == 0:
        for i in expand_paths:
            i.update_g(1)
    if type_preference == 1:
        for i in expand_paths:
            i.update_g(map.connections[i.penultimate][i.last])
    if type_preference == 2:
        for i in expand_paths:
            i.update_g(map.connections[i.penultimate][i.last]*map.stations[i.penultimate]['velocity'])
    if type_preference == 3:
        for i in expand_paths:
            if(map.stations[i.last]['line'] != map.stations[i.penultimate]['line']):
                i.update_g(1)
    return expand_paths

In [10]:
def take_cost(elem):
    return elem.g

In [11]:
def insert_cost(expand_paths, list_of_path):
    """
        expand_paths is inserted to the list_of_path according to COST VALUE
        Format of the parameter is:
           Args:
               expand_paths (LIST of Path Class): Expanded paths
               list_of_path (LIST of Path Class): The paths to be visited
           Returns:
               list_of_path (LIST of Path Class): List of Paths where expanded_path is inserted according to cost
    """
    for i in list_of_path:
        expand_paths.append(i)
    expand_paths.sort(key=take_cost)
    return expand_paths

In [12]:
def uniform_cost_search(origin_id, destination_id, map, type_preference=0):
    """
     Uniform Cost Search algorithm
     Format of the parameter is:
        Args:
            origin_id (int): Starting station id
            destination_id (int): Final station id
            map (object of Map class): All the map information
        Returns:
            list_of_path[0] (Path Class): The route that goes from origin_id to destination_id
    """
    path_origin = Path(origin_id)
    llista=[]
    for i in origin_id:
        path=Path(i)
        llista.append(path)
    while(llista[0].last != destination_id and len(llista) != 0):
        to_expand = llista[0]
        llista.pop(0)
        expanded = expand(to_expand, map)
        no_cycles = remove_cycles(expanded)
        costs = calculate_cost(no_cycles, map, type_preference)
        llista = insert_cost(costs, llista)
    if(len(llista) != 0):
        return llista[0]
    else:
        return "No existeix Solucio"

In [13]:
def calculate_heuristics(expand_paths, map, destination_id, type_preference=0):
    """
     Calculate and UPDATE the heuristics of a path according to type preference
     WARNING: In calculate_cost, we didn't update the cost of the path inside the function
              for the reasons which will be clear when you code Astar (HINT: check remove_redundant_paths() function).
     Format of the parameter is:
        Args:
            expand_paths (LIST of Path Class): Expanded paths
            map (object of Map class): All the map information
            type_preference: INTEGER Value to indicate the preference selected:
                            0 - Adjacency
                            1 - minimum Time
                            2 - minimum Distance
                            3 - minimum Transfers
        Returns:
            expand_paths (LIST of Path Class): Expanded paths with updated heuristics
    """
    if type_preference == 0:
        for i in expand_paths:
            if(i.last != destination_id):
                i.update_h(1)
            else:
                i.update_h(0)
    if type_preference == 1:
        for i in expand_paths:
            station = [map.stations[i.last]['x'],map.stations[i.last]['y']]
            destination = [map.stations[destination_id[0]]['x'],map.stations[destination_id[0]]['y']]
            dist = euclidean_dist(station,destination)
            i.update_h(dist/map.stations[i.last]["velocity"])
    if type_preference == 2:
        for i in expand_paths:
            station = [map.stations[i.last]['x'],map.stations[i.last]['y']]
            destination = [map.stations[destination_id[0]]['x'],map.stations[destination_id[0]]['y']]
            dist = euclidean_dist(station,destination)
            i.update_h(dist)
    if type_preference == 3:
        for i in expand_paths:
            not_equal = False
            for j in destination_id:
                if(map.stations[i.last]['line'] != map.stations[j]['line']):
                    not_equal = True
            if not_equal:
                i.update_h(1)
            else:
                i.update_h(0)
    return expand_paths

In [14]:
def update_f(expand_paths):
    """
      Update the f of a path
      Format of the parameter is:
         Args:
             expand_paths (LIST of Path Class): Expanded paths
         Returns:
             expand_paths (LIST of Path Class): Expanded paths with updated costs
    """
    for i in expand_paths:
        i.update_f()
    

In [15]:
def remove_redundant_paths(expand_paths, list_of_path, visited_stations_cost):
    """
      It removes the Redundant Paths. They are not optimal solution!
      If a station is visited and have a lower g in this moment, we should remove this path.
      Format of the parameter is:
         Args:
             expand_paths (LIST of Path Class): Expanded paths
             list_of_path (LIST of Path Class): All the paths to be expanded
             visited_stations_cost (dict): All visited stations cost
         Returns:
             new_paths (LIST of Path Class): Expanded paths without redundant paths
             list_of_path (LIST of Path Class): list_of_path without redundant paths
    """
    for i in expand_paths:  
        if i.last not in visited_stations_cost.keys():
            visited_stations_cost[i.last] = i.g
        else:
            if(i.g > visited_stations_cost[i.last]):
                expand_paths.remove(i)
            else:
                visited_stations_cost[i.last] = i.g

    for j in list_of_path:
        if(j.g > visited_stations_cost[j.last]):
            list_of_path.remove(j)

    return expand_paths, list_of_path

In [16]:
def take_cost_f(elem):
    return elem.f

In [17]:
def insert_cost_f(expand_paths, list_of_path):
    """
        expand_paths is inserted to the list_of_path according to f VALUE
        Format of the parameter is:
           Args:
               expand_paths (LIST of Path Class): Expanded paths
               list_of_path (LIST of Path Class): The paths to be visited
           Returns:
               list_of_path (LIST of Path Class): List of Paths where expanded_path is inserted according to f
    """
    for i in list_of_path:
        expand_paths.append(i)
    expand_paths.sort(key=take_cost_f)
    return expand_paths


In [18]:
def Astar(origin_coor, dest_coor, map, type_preference=0):
    """
     A* Search algorithm
     Format of the parameter is:
        Args:
            origin_id (list): Starting station id
            destination_id (int): Final station id
            map (object of Map class): All the map information
            type_preference: INTEGER Value to indicate the preference selected:
                            0 - Adjacency
                            1 - minimum Time
                            2 - minimum Distance
                            3 - minimum Transfers
        Returns:
            list_of_path[0] (Path Class): The route that goes from origin_id to destination_id
    """
    origin_id = coord2station(origin_coor,map)
    destination_id = coord2station(dest_coor,map)
    visited_stations_cost={}
    for i in map.stations:
        visited_stations_cost.update({i : INF})
    llista = []
    for i in origin_id:
        path=Path(i)
        llista.append(path)
    while(llista[0].last != destination_id and len(llista) != 0):
        to_expand = llista[0]
        llista.pop(0)
        expanded = expand(to_expand, map)
        no_cycles = remove_cycles(expanded)
        costs = calculate_cost(no_cycles, map, type_preference)
        heuristics = calculate_heuristics(costs, map, destination_id, type_preference)
        update_f(heuristics)
        no_redundant, llista = remove_redundant_paths(heuristics,llista,visited_stations_cost)
        llista = insert_cost_f(no_redundant, llista)
        print(len(llista[0].route),llista[0].g,llista[0].h)
    if(len(llista) != 0):
        return llista[0]
    else:
        return "No existeix Solucio"

In [19]:
class Map:
    """
    A class for keeping all the data regarding stations and their connections

    self.stations: is a dictionary of dictionary with the format of
            {station_id: {"name": name_value, "line": line_value, ...}

    self.connectipns: is a dictionary of dictionary holding all the connection information with the format of
            {
                station_1 : {first_connection_to_station_1: cost_1_1, second_connection_to_station_1: cost_1_2}
                station_2 : {first_connection_to_station_2: cost_2_1, second_connection_to_station_1: cost_2_2}
                ....
            }
    """
    def __init__(self):
        self.stations = {}
        self.connections = {}

    def add_station(self, id, name, line, x, y):
        self.stations[id] = {'name': name, 'line': int(line), 'x': x, 'y': y}

    def add_connection(self, connections):
        self.connections = connections

    def combine_dicts(self):
        for k, v in self.stations.items():
            v.update({'velocity': self.velocity[v['line']]})

    def add_velocity(self, velocity):
        self.velocity = {ix+1: v for ix, v in enumerate(velocity)}
        self.combine_dicts()


class Path:
    """
    A class for keeping the route information from starting station to expanded station.
    Usage:
        # path is initialized with starting station number 2
        >>> path = Path(2)
        # Station 5 is added to the self.route
        >>> path.add_route(5)
        # Assume the cost from station 2 to station 5 is 10, we updated the path's cost
        >>> path.update_g(10)
        # You can reach the last and penultimate station of a path
        >>> path.last, path.penultimate
    """
    def __init__(self, route):
        if type(route) is list:
            self.route = route
        else:
            self.route = [route]

        self.head = self.route[0]
        self.last = self.route[-1]
        if len(self.route) >= 2: self.penultimate = self.route[-2]
        # Real cost
        self.g = 0
        # Heuristic cost
        self.h = 0
        # Combination of the two
        self.f = 0

    def __eq__(self, other):
        if other is not None:
            return self.route == other.route

    def update_h(self, h):
        self.h = h

    def update_g(self, g):
        self.g += g

    def update_f(self):
        self.f = self.g + self.h

    def add_route(self, children):
        # Adding a new station to the route list
        self.route.append(children)
        self.penultimate = self.route[-2]
        self.last = self.route[-1]

<h1>TESTS</h1>

In [20]:
import numpy as np
import math
import signal
import time
import copy

# Infinite cost represented by INF
INF = 9999

def euclidean_dist(x, y):
    x1, y1 = x
    x2, y2 = y
    return math.sqrt((x1-x2)**2 + (y1-y2)**2)

# readStationInformation: Given a filename, it reads the information of this file.
def read_station_information(filename):
    map = Map()
    with open(filename, 'r') as fileMetro:
        for line in fileMetro:
            information = line.split('\t')
            # TODO: Change the monstrous way of parsing
            map.add_station(int(information[0]), information[1], information[2], int(information[3]),
                                   int((information[4].replace('\n', '')).replace(' ', '')))
    return map


def read_information(filename):
    with open(filename, 'r') as fp:
        vel = fp.readlines()
        vel = [i.split('\n')[0] for i in vel]
    vector = [int(v.split(':')[-1]) for v in vel]
    return (vector)


def read_cost_table(filename):
    adj_matrix = np.loadtxt(filename)
    row, col = adj_matrix.nonzero()
    connections = {}
    for r, c in zip(row, col):
        if r+1 not in connections:
            connections[r + 1] = {c + 1: adj_matrix[r][c]}
        else:
            connections[r + 1].update({c + 1: adj_matrix[r][c]})

    return connections


def print_list_of_path(pathList):
    for p in pathList:
        print("Route: {}".format(p.route))


def print_list_of_path_with_cost(pathList):
    for p in pathList:
        print("Route: {}, \t Cost: {}".format(p.route, p.g))


class TestTimeout(Exception):
    pass

class test_timeout:
    def __init__(self, seconds, error_message=None):
        if error_message is None:
              error_message = 'test timed out after {}s.'.format(seconds)
        self.seconds = seconds
        self.error_message = error_message

    def handle_timeout(self, signum, frame):
        raise TestTimeout(self.error_message)

    def __enter__(self):
        signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.seconds)

    def __exit__(self, exc_type, exc_val, exc_tb):
        signal.alarm(0)

In [31]:
import os
ROOT_FOLDER = 'CityInformation/Lyon_smallCity'
mapa = read_station_information(os.path.join(ROOT_FOLDER, 'Stations.txt'))
connections = read_cost_table(os.path.join(ROOT_FOLDER, 'Time.txt'))
infoVelocity = read_information(os.path.join(ROOT_FOLDER, 'InfoVelocity.txt'))
mapa.add_connection(connections)
mapa.add_velocity(infoVelocity)

In [33]:
MyPosition = [140, 56]
MyDestination = [140, 115]

origin_id = coord2station(MyPosition,mapa)
destination_id = coord2station(MyDestination,mapa)

for i in origin_id:
    print(mapa.stations[i]['name'],mapa.stations[i]['line'],i)

for i in destination_id:
    print(mapa.stations[i]['name'],mapa.stations[i]['line'],euclidean_dist(MyDestination,[mapa.stations[i]['x'],mapa.stations[i]['y']]))

CHARPENNES 1 2
CHARPENNES 2 5
CHARPENNES 3 10
COLLEGE BELLECOMBE 2 0.0


In [36]:
sol = uniform_cost_search([9], 3, mapa,2)

In [37]:
for i in sol.route:
    print(i,mapa.stations[i]['name'], mapa.stations[i]['line'])

9 PARTDIEU SERVIENT 2
8 PART-DIEU 2
7 THIERS-LAFAYETTE 2
6 COLLEGE BELLECOMBE 2
5 CHARPENNES 2
2 CHARPENNES 1
3 REPUBLIQUE 1


In [38]:
sol1 = Astar(MyPosition, MyDestination, mapa, 1)

1 0 0
1 0 0
2 2.16242 0.8270130695968175
3 4.78464 2.1436095093078547
2 7.14286 0.0


In [26]:
for i in sol1.route:
    print(mapa.stations[i]['name'], i)

CHARPENNES 5
COLLEGE BELLECOMBE 6
