In [212]:
import json
import random 
import datetime

In [213]:
# This class represent a graph
class Graph:
    # Initialize the class
    def __init__(self, graph_dict=None, directed=True):
        self.graph_dict = graph_dict or {}
        self.directed = directed
        if not directed:
            self.make_undirected()
    # Create an undirected graph by adding symmetric edges
    def make_undirected(self):
        for a in list(self.graph_dict.keys()):
            for (b, dist) in self.graph_dict[a].items():
                self.graph_dict.setdefault(b, {})[a] = dist
    # Add a link from A and B of given distance, and also add the inverse link if the graph is undirected
    def connect(self, A, B, distance=1):
        self.graph_dict.setdefault(A, {})[B] = distance
        if not self.directed:
            self.graph_dict.setdefault(B, {})[A] = distance
#         print('graph dict: {}'.format(self.graph_dict))
    # Get neighbors or a neighbor
    def get(self, a, b=None):
#         print('graph dict: {}'.format(self.graph_dict))
        links = self.graph_dict.setdefault(a, {})
        if b is None:
            return links
#             print('links: {}'.format(links))
        else:
            return links.get(b)
#             print('links: {}'.format(links))
    # Return a list of nodes in the graph
    def nodes(self):
        s1 = set([k for k in self.graph_dict.keys()])
        s2 = set([k2 for v in self.graph_dict.values() for k2, v2 in v.items()])
        nodes = s1.union(s2)
        return list(nodes)
    def print_graph(self):
        print(self.graph_dict)
    
    
# This class represent a node
class Node:
    # Initialize the class
    def __init__(self, name:str, parent:str):
        self.name = name
        self.parent = parent
        self.g = 0 # Distance to start node
        self.h = 0 # Distance to goal node
        self.f = 0 # Total cost
    # Compare nodes
    def __eq__(self, other):
        return self.name == other.name
    # Sort nodes
    def __lt__(self, other):
         return self.f < other.f
    # Print node
    def __repr__(self):
        return ('({0},{1})'.format(self.name, self.f))
    
    
# A* search
def astar_search(graph, heuristics, start, end):
    
    # Create lists for open nodes and closed nodes
    open = []
    closed = []
    # Create a start node and an goal node
    start_node = Node(start, None)
    goal_node = Node(end, None)
    # Add the start node
    open.append(start_node)
    
    # Loop until the open list is empty
    while len(open) > 0:
        # Sort the open list to get the node with the lowest cost first
        open.sort()
        # Get the node with the lowest cost
        current_node = open.pop(0)
#         print('current node: {}'.format(current_node))
        # Add the current node to the closed list
        closed.append(current_node)
        
        # Check if we have reached the goal, return the path
        if current_node == goal_node:
#             path = []
            path = {}
            while current_node != start_node:
#                 path.append(str(current_node.name) + ': ' + str(current_node.g))
                path.setdefault(current_node.name, current_node.g)
                current_node = current_node.parent
#             path.append(str(start_node.name) + ': ' + str(start_node.g))
            path.setdefault(start_node.name, start_node.g)
            # Return reversed path
            return path
#             return path[::-1]
        # Get neighbours
        neighbors = graph.get(current_node.name)
#         print('current_node: {}'.format(current_node.name))
#         print('neighbors: {}'.format(neighbors))
        # Loop neighbors
        for key, value in neighbors.items():
            # Create a neighbor node
            neighbor = Node(key, current_node)
            # Check if the neighbor is in the closed list
            if(neighbor in closed):
                continue
            # Calculate full path cost
            neighbor.g = current_node.g + graph.get(current_node.name, neighbor.name)
            neighbor.h = heuristics.get(neighbor.name)
            neighbor.f = neighbor.g + neighbor.h
            # Check if neighbor is in open list and if it has a lower f value
            if(add_to_open(open, neighbor) == True):
                # Everything is green, add neighbor to open list
                open.append(neighbor)
    # Return None, no path is found
    return None
# Check if a neighbor should be added to open list
def add_to_open(open, neighbor):
    for node in open:
        if (neighbor == node and neighbor.f > node.f):
            return False
    return True


In [214]:
class Lane:
    # Initialize the class
    def __init__(self, graph_dict=None, directed=True):
        self.graph_dict = graph_dict or {}
        self.directed = directed
        if not directed:
            self.make_undirected()
    # Create an undirected graph by adding symmetric edges
    def make_undirected(self):
        for a in list(self.graph_dict.keys()):
            for (b, dist) in self.graph_dict[a].items():
                self.graph_dict.setdefault(b, {})[a] = dist
    # Add a link from A and B of given distance, and also add the inverse link if the graph is undirected
    def connect(self, A, B, distance=1):
        self.graph_dict.setdefault(A, {})[B] = distance
        if not self.directed:
            self.graph_dict.setdefault(B, {})[A] = distance
#         print('graph dict: {}'.format(self.graph_dict))
    # Get neighbors or a neighbor
    def get(self, a, b=None):
#         print('graph dict: {}'.format(self.graph_dict))
        links = self.graph_dict.setdefault(a, {})
        if b is None:
            return links
#             print('links: {}'.format(links))
        else:
            return links.get(b)
#             print('links: {}'.format(links))
    # Return a list of nodes in the graph
    def nodes(self):
        s1 = set([k for k in self.graph_dict.keys()])
        s2 = set([k2 for v in self.graph_dict.values() for k2, v2 in v.items()])
        nodes = s1.union(s2)
        return list(nodes)
    def print_lanes(self):
        print(self.graph_dict)
    

In [220]:
def find_lane(lane, path_list, service_weight):
    
#     print('path list: {}'.format(path_list))
    l = 0
    for i in range(len(path_list)-1):
        a_end = path_list[i]
        z_end = path_list[i+1]
        weights = lane.get(a_end, z_end)
        while weights[l] < service_weight:
            l = l + 1
#             print('lane: {}'.format(l))
            
#     print('lane:{}'.format(l))        
    for i in range(len(path_list)-1):
        a_end = path_list[i]
        z_end = path_list[i+1]
        weights = lane.get(a_end, z_end)
#         print('weights:{}'.format(weights))
        left_weight = weights[l] - service_weight
        weights[l] = left_weight
        lane.connect(a_end, z_end, weights)
        left_weights = lane.get(a_end, z_end)
#         print('left weights:{}'.format(left_weights))
    return l

In [226]:
def main():
    print(datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S'))
    g = open('./data/sample_29/graph.json')
    g = json.load(g)
    edges = g['edges']
    nodes = g['nodes']
    
    graph = Graph(directed = False)
    
    lane = Lane(directed = False)
    
    
    for i in range(len(edges)):
        # Set the graph
        a_end = edges[i]['a_end']
        z_end = edges[i]['z_end']
        cost = edges[i]['cost']
        lanes = edges[i]['lane_weights']
#         print('a_end: {}, z_end: {}, cost: {}'.format(a_end, z_end, cost))
        # Create graph connections
        graph.connect(a_end, z_end, cost)
        lane.connect(a_end, z_end, lanes)

    

###### Set the heuristics ######
    heuristics = {}
    for i in range(len(nodes)):
#         heuristics[i] = random.randint(1,9)
        heuristics[i] = 0
#         print('heuristics {}: {}'.format(i, heuristics[i]))
        

    
    
    
    s = open('./data/sample_29/services.json')
    services = json.load(s)
    for i in range(len(services)):
        service = services[i]
        a_end = service['a_end']
        z_end = service['z_end']
        service_weight = service['weight']
        path_dict = astar_search(graph, heuristics, a_end, z_end)
#         print('path: {}'.format(path_dict))
        # reverse the order of the path
        path_list = list(path_dict.keys())[::-1]
        l = find_lane(lane, path_list, service_weight)
        res = {}
        for i in range(len(path_list)):
            node = path_list[i]
            cost = path_dict.get(node)
            res.setdefault(node, cost)
        
        res.setdefault('lane', l)
        print('service: {}'.format(service))
        print('path: {}'.format(res))
        
        
    print(datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S'))
        
    

In [227]:
# Tell python to run main method
if __name__ == "__main__": 
    main()

16/09/2021 14:49:42
service: {'a_end': 138, 'z_end': 206, 'weight': 1}
path: {138: 0, 171: 1.0, 85: 2.0, 71: 3.0, 206: 4.0, 'lane': 0}
service: {'a_end': 74, 'z_end': 156, 'weight': 1}
path: {74: 0, 86: 1.0, 156: 3.0, 'lane': 0}
service: {'a_end': 7, 'z_end': 159, 'weight': 1}
path: {7: 0, 115: 1.0, 20: 2.0, 159: 4.0, 'lane': 0}
service: {'a_end': 167, 'z_end': 211, 'weight': 1}
path: {167: 0, 182: 2.0, 211: 3.0, 'lane': 0}
service: {'a_end': 53, 'z_end': 250, 'weight': 1}
path: {53: 0, 155: 1.0, 241: 2.0, 171: 3.0, 250: 5.0, 'lane': 0}
service: {'a_end': 65, 'z_end': 12, 'weight': 1}
path: {65: 0, 146: 1.0, 12: 2.0, 'lane': 0}
service: {'a_end': 101, 'z_end': 96, 'weight': 1}
path: {101: 0, 38: 1.0, 15: 3.0, 96: 4.0, 'lane': 0}
service: {'a_end': 164, 'z_end': 34, 'weight': 1}
path: {164: 0, 160: 1.0, 161: 2.0, 34: 3.0, 'lane': 0}
service: {'a_end': 253, 'z_end': 20, 'weight': 10}
path: {253: 0, 134: 1.0, 223: 3.0, 20: 4.0, 'lane': 0}
service: {'a_end': 118, 'z_end': 1, 'weight': 1}
p