In [124]:
import xml.etree.ElementTree as ET

# Path to the OSM file


# Parse the OSM file
tree = ET.parse(r"C:\Users\FPTSHOP\Downloads\map (4).osm")
root = tree.getroot()

# Extract nodes and their coordinates
nodes = {}
for element in root:
    if element.tag == 'node':
        node_id = element.attrib['id']
        lat = float(element.attrib['lat'])
        lon = float(element.attrib['lon'])
        nodes[node_id] = (lat, lon)

# Display the number of nodes found and a few examples
num_nodes = len(nodes)
example_nodes = list(nodes.items())[:5]

num_nodes, example_nodes



(136623,
 [('74126839', (21.0314522, 105.8529442)),
  ('74126842', (21.0316279, 105.8520348)),
  ('74126845', (21.0316478, 105.8516382)),
  ('74126847', (21.0315876, 105.8514927)),
  ('74126849', (21.0315049, 105.8514137))])

In [125]:
from collections import defaultdict

# Function to find adjacencies
def find_adjacencies(root):
    adjacencies = defaultdict(set)
    for element in root:
        if element.tag == 'way':
            # Extract the node IDs in this way
            way_nodes = [nd.attrib['ref'] for nd in element if nd.tag == 'nd']
            # Add adjacencies
            for i in range(len(way_nodes) - 1):
                adjacencies[way_nodes[i]].add(way_nodes[i + 1])
                adjacencies[way_nodes[i + 1]].add(way_nodes[i])
    return adjacencies

# Find adjacencies between nodes
adjacencies = find_adjacencies(root)

# Display the number of adjacencies and a few examples
num_adjacencies = len(adjacencies)
example_adjacencies = {k: v for k, v in list(adjacencies.items())[:5]}

num_adjacencies, example_adjacencies



(127524,
 {'309641646': {'11304264838', '11304581172', '11304581471'},
  '11304581471': {'11304264839', '11304581472', '309641646'},
  '11304264839': {'11304581471', '2169312366'},
  '2169312366': {'11304264839', '2169312365'},
  '2169312365': {'11304264840', '2169312366'}})

In [126]:
coord_to_index = {coord: int(idx) for idx, coord in nodes.items()}
index_to_coord = {int(idx): coord for idx, coord in nodes.items()}

In [127]:
import json
# Convert tuples to strings for JSON serialization
coord_to_index_serializable = {str(k): v for k, v in coord_to_index.items()}
index_to_coord_serializable = {k: str(v) for k, v in index_to_coord.items()}
adjacencies_serializable = {k: list(v) for k, v in adjacencies.items()}
# Save adjacencies to a JSON file
with open('adjacencies.json', 'w') as f:
    json.dump(adjacencies_serializable, f)

# Save coord_to_index to a JSON file
with open('coord_to_index.json', 'w') as f:
    json.dump(coord_to_index_serializable, f)

# Save index_to_coord to a JSON file
with open('index_to_coord.json', 'w') as f:
    json.dump(index_to_coord_serializable, f)


In [128]:
import numpy as np
import scipy.sparse as sparse

# Create a mapping of node ID to a unique index
node_to_index = {node_id: idx for idx, node_id in enumerate(nodes.keys())}

# Prepare data for the sparse matrix
rows = []
cols = []

for node_id, adjacent_nodes in adjacencies.items():
    node_idx = node_to_index[node_id]
    for adj_node_id in adjacent_nodes:
        adj_node_idx = node_to_index[adj_node_id]
        rows.append(node_idx)
        cols.append(adj_node_idx)

# Create the sparse matrix in COO format
data = np.ones(len(rows))  # Use 1 to indicate an edge
adjacency_matrix = sparse.coo_matrix((data, (rows, cols)), shape=(num_nodes, num_nodes))

# Display the shape of the matrix and a small part of it as an example
matrix_shape = adjacency_matrix.shape






In [129]:
from queue import PriorityQueue
import numpy as np

def haversine(coord1, coord2):
    # Calculate the Haversine distance between two coordinates in (lon, lat) format
    lon1, lat1 = coord1
    lon2, lat2 = coord2
    R = 6371  # Radius of the Earth in kilometers
    dlat = np.radians(lat2 - lat1)
    dlon = np.radians(lon2 - lon1)
    a = np.sin(dlat / 2) ** 2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon / 2) ** 2
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
    distance = R * c
    return distance

def a_star(start_coord, goal_coord, adjacencies, coord_to_index, index_to_coord):
    start = coord_to_index[start_coord]
    goal = coord_to_index[goal_coord]

    open_set = PriorityQueue()
    open_set.put((0, start))

    came_from = {start: None}
    cost_so_far = {start: 0}

    node_seen_count = {}
    explored_routes = []

    while not open_set.empty():
        current = open_set.get()[1]
        node_seen_count[current] = node_seen_count.get(current, 0) + 1
        
        if current == goal:
            break

        current_coord = index_to_coord[int(current)]

        # Access only neighbors of the current node
        for next_node in adjacencies.get(str(current), []):
            next_node = int(next_node)
            next_node_coord = index_to_coord[int(next_node)]

            # Calculate distance using Haversine formula
            distance = haversine(current_coord, next_node_coord)
            new_cost = cost_so_far[current] + distance

            if next_node not in cost_so_far or new_cost < cost_so_far[next_node]:
                cost_so_far[next_node] = new_cost
                priority = new_cost + haversine(next_node_coord, goal_coord)
                open_set.put((priority, next_node))
                came_from[next_node] = current
                explored_routes.append((current, next_node))

    # Reconstruct path
    path = []
    current = goal
    while current != start:
        if current not in came_from:
            break
        path.append(index_to_coord[current])
        current = came_from[current]
    path.append(index_to_coord[start])
    path.reverse()

    return path, cost_so_far.get(goal, float('inf')), node_seen_count, explored_routes


In [130]:
def UCS(start_coord, goal_coord, adjacencies, coord_to_index, index_to_coord):
    start = coord_to_index[start_coord]
    goal = coord_to_index[goal_coord]

    open_set = PriorityQueue()
    open_set.put((0, start))

    came_from = {start: None}
    cost_so_far = {start: 0}

    node_seen_count = {}
    explored_routes = []

    while not open_set.empty():
        current = open_set.get()[1]
        node_seen_count[current] = node_seen_count.get(current, 0) + 1
        
        if current == goal:
            break

        current_coord = index_to_coord[int(current)]

        # Access only neighbors of the current node
        for next_node in adjacencies.get(str(current), []):
            next_node = int(next_node)
            next_node_coord = index_to_coord[int(next_node)]

            # Calculate distance using Haversine formula
            distance = haversine(current_coord, next_node_coord)
            new_cost = cost_so_far[current] + distance

            if next_node not in cost_so_far or new_cost < cost_so_far[next_node]:
                cost_so_far[next_node] = new_cost
                priority = new_cost 
                open_set.put((priority, next_node))
                came_from[next_node] = current
                explored_routes.append((current, next_node))

    # Reconstruct path
    path = []
    current = goal
    while current != start:
        if current not in came_from:
            break
        path.append(index_to_coord[current])
        current = came_from[current]
    path.append(index_to_coord[start])
    path.reverse()

    return path, cost_so_far.get(goal, float('inf')), node_seen_count, explored_routes


In [131]:
def a_star_plus(start_coord, goal_coord, adjacencies, coord_to_index, index_to_coord):
    start = coord_to_index[start_coord]
    goal = coord_to_index[goal_coord]
    air_distance = haversine(start_coord,goal_coord)
    open_set = PriorityQueue()
    open_set.put((0, start))

    came_from = {start: None}
    cost_so_far = {start: 0}

    node_seen_count = {}
    explored_routes = []

    while not open_set.empty():
        current = open_set.get()[1]
        node_seen_count[current] = node_seen_count.get(current, 0) + 1
        
        if current == goal:
            break

        current_coord = index_to_coord[int(current)]

        # Access only neighbors of the current node
        for next_node in adjacencies.get(str(current), []):
            next_node = int(next_node)
            next_node_coord = index_to_coord[int(next_node)]

            # Calculate distance using Haversine formula
            distance = haversine(current_coord, next_node_coord)
            new_cost = cost_so_far[current] + distance

            if next_node not in cost_so_far or new_cost < cost_so_far[next_node]:
                cost_so_far[next_node] = new_cost
                heuristic = haversine(next_node_coord, goal_coord)
                if heuristic>=air_distance*0.7:
                    priority = new_cost + heuristic
                else :
                    priority = new_cost + heuristic*0.65
                open_set.put((priority, next_node))
                came_from[next_node] = current
                explored_routes.append((current, next_node))

    # Reconstruct path
    path = []
    current = goal
    while current != start:
        if current not in came_from:
            break
        path.append(index_to_coord[current])
        current = came_from[current]
    path.append(index_to_coord[start])
    path.reverse()

    return path, cost_so_far.get(goal, float('inf')), node_seen_count, explored_routes


In [132]:
def prune_open_set(open_set, memory_limit):
    if open_set.qsize() > memory_limit:
        # Keep the nodes with the lowest estimated total cost
        kept_nodes = [open_set.get() for _ in range(memory_limit)]
        open_set.queue.clear()
        for item in kept_nodes:
            open_set.put(item)

def a_star_memory_bounded(start_coord, goal_coord, adjacencies, coord_to_index, index_to_coord, memory_limit):
    start = coord_to_index[start_coord]
    goal = coord_to_index[goal_coord]
    air_distance = haversine(start_coord,goal_coord)
    open_set = PriorityQueue()
    open_set.put((0, start))

    came_from = {start: None}
    cost_so_far = {start: 0}

    node_seen_count = {}
    explored_routes = []

    while not open_set.empty():
        prune_open_set(open_set, memory_limit)  # Prune the open set to meet memory constraints

        current = open_set.get()[1]
        node_seen_count[current] = node_seen_count.get(current, 0) + 1
        
        if current == goal:
            break

        current_coord = index_to_coord[int(current)]

        # Access only neighbors of the current node
        for next_node in adjacencies.get(str(current), []):
            next_node = int(next_node)
            next_node_coord = index_to_coord[int(next_node)]

            # Calculate distance using Haversine formula
            distance = haversine(current_coord, next_node_coord)
            new_cost = cost_so_far[current] + distance

            if next_node not in cost_so_far or new_cost < cost_so_far[next_node]:
                cost_so_far[next_node] = new_cost
                heuristic = haversine(next_node_coord, goal_coord)
                if heuristic>=air_distance*0.7:
                    priority = new_cost + heuristic
                else :
                    priority = new_cost + heuristic*0.65
                open_set.put((priority, next_node))
                came_from[next_node] = current
                explored_routes.append((current, next_node))

    # Reconstruct path
    path = []
    current = goal
    while current != start:
        if current not in came_from:
            break
        path.append(index_to_coord[current])
        current = came_from[current]
    path.append(index_to_coord[start])
    path.reverse()

    return path, cost_so_far.get(goal, float('inf')), node_seen_count, explored_routes



In [133]:
import json
from queue import PriorityQueue



def reconstruct_path(came_from, start, goal):
    # Reconstructs the path from start to goal
    current = goal
    path = []
    while current != start:
        path.append(current)
        current = came_from[current]
    path.append(start)
    path.reverse()
    return path

def a_star_2(start_coord, goal_coord, adjacencies, coord_to_index, index_to_coord):
    start = coord_to_index[start_coord]
    goal = coord_to_index[goal_coord]

    open_set = PriorityQueue()
    open_set.put((0, start))

    came_from = {start: None}
    cost_so_far = {start: 0}
    node_seen_count = {}
    states = []

    while not open_set.empty():
        current = open_set.get()[1]
        node_seen_count[current] = node_seen_count.get(current, 0) + 1

        if current == goal:
            break

        for next_node in adjacencies.get(str(current), []):
            next_node = int(next_node)
            new_cost = cost_so_far[current] + haversine(index_to_coord[current], index_to_coord[next_node])

            if next_node not in cost_so_far or new_cost < cost_so_far[next_node]:
                cost_so_far[next_node] = new_cost
                priority = new_cost + haversine(index_to_coord[next_node], goal_coord)
                open_set.put((priority, next_node))
                came_from[next_node] = current

                # Record the state for visualization
                current_path = reconstruct_path(came_from, start, current)
                state = {
                    "current": index_to_coord[current],
                    "open_set": [index_to_coord[node] for _, node in open_set.queue],
                    "path": [index_to_coord[node] for node in current_path]
                }
                states.append(state)

    # Save states to a JSON file
    with open('a_star_states.json', 'w') as f:
        json.dump(states, f)

    final_path = reconstruct_path(came_from, start, goal)
    return [index_to_coord[node] for node in final_path], cost_so_far.get(goal, float('inf')), node_seen_count

# Example usage (ensure to define adjacencies, coord_to_index, index_to_coord, and haversine)
start_coord = (21.0314522, 105.8529442)
goal_coord = (21.0072383, 105.8348649)

path, cost, node_seen_count = a_star_2(start_coord, goal_coord, adjacencies, coord_to_index, index_to_coord)


In [134]:
def ucs_2(start_coord, goal_coord, adjacencies, coord_to_index, index_to_coord):
    start = coord_to_index[start_coord]
    goal = coord_to_index[goal_coord]

    open_set = PriorityQueue()
    open_set.put((0, start))

    came_from = {start: None}
    cost_so_far = {start: 0}
    node_seen_count = {}
    states = []

    while not open_set.empty():
        current = open_set.get()[1]
        node_seen_count[current] = node_seen_count.get(current, 0) + 1

        if current == goal:
            break

        for next_node in adjacencies.get(str(current), []):
            next_node = int(next_node)
            new_cost = cost_so_far[current] + haversine(index_to_coord[current], index_to_coord[next_node])

            if next_node not in cost_so_far or new_cost < cost_so_far[next_node]:
                cost_so_far[next_node] = new_cost
                priority = new_cost
                open_set.put((priority, next_node))
                came_from[next_node] = current

                # Record the state for visualization
                current_path = reconstruct_path(came_from, start, current)
                state = {
                    "current": index_to_coord[current],
                    "open_set": [index_to_coord[node] for _, node in open_set.queue],
                    "path": [index_to_coord[node] for node in current_path]
                }
                states.append(state)

    # Save states to a JSON file
    with open('ucs_states.json', 'w') as f:
        json.dump(states, f)

    final_path = reconstruct_path(came_from, start, goal)
    return [index_to_coord[node] for node in final_path], cost_so_far.get(goal, float('inf')), node_seen_count

In [135]:
def a_star_plus_2(start_coord, goal_coord, adjacencies, coord_to_index, index_to_coord):
    start = coord_to_index[start_coord]
    goal = coord_to_index[goal_coord]

    open_set = PriorityQueue()
    open_set.put((0, start))

    came_from = {start: None}
    cost_so_far = {start: 0}
    node_seen_count = {}
    states = []
    air_distance=haversine(start_coord, goal_coord)
    while not open_set.empty():
        current = open_set.get()[1]
        node_seen_count[current] = node_seen_count.get(current, 0) + 1

        if current == goal:
            break

        for next_node in adjacencies.get(str(current), []):
            next_node = int(next_node)
            new_cost = cost_so_far[current] + haversine(index_to_coord[current], index_to_coord[next_node])

            if next_node not in cost_so_far or new_cost < cost_so_far[next_node]:
                cost_so_far[next_node] = new_cost
                heuristic = haversine(index_to_coord[next_node], goal_coord)
                if heuristic>=air_distance*0.7:
                    priority = new_cost + heuristic
                else :
                    priority = new_cost + heuristic*0.65
                open_set.put((priority, next_node))
                came_from[next_node] = current

                # Record the state for visualization
                current_path = reconstruct_path(came_from, start, current)
                state = {
                    "current": index_to_coord[current],
                    "open_set": [index_to_coord[node] for _, node in open_set.queue],
                    "path": [index_to_coord[node] for node in current_path]
                }
                states.append(state)

    # Save states to a JSON file
    with open('a_star_plus_states.json', 'w') as f:
        json.dump(states, f)

    final_path = reconstruct_path(came_from, start, goal)
    return [index_to_coord[node] for node in final_path], cost_so_far.get(goal, float('inf')), node_seen_count

In [136]:
def prune_open_set(open_set, memory_limit):
    if open_set.qsize() > memory_limit:
        # Keep the nodes with the lowest estimated total cost
        kept_nodes = [open_set.get() for _ in range(memory_limit)]
        open_set.queue.clear()
        for item in kept_nodes:
            open_set.put(item)

 # Prune the open set to meet memory constraints
def a_star_memory_bounded_2(start_coord, goal_coord, adjacencies, coord_to_index, index_to_coord,memory_limit):
    start = coord_to_index[start_coord]
    goal = coord_to_index[goal_coord]

    open_set = PriorityQueue()
    open_set.put((0, start))

    came_from = {start: None}
    cost_so_far = {start: 0}
    node_seen_count = {}
    states = []
    air_distance=haversine(start_coord, goal_coord)
    while not open_set.empty():
        prune_open_set(open_set, memory_limit)
        current = open_set.get()[1]
        node_seen_count[current] = node_seen_count.get(current, 0) + 1

        if current == goal:
            break

        for next_node in adjacencies.get(str(current), []):
            next_node = int(next_node)
            new_cost = cost_so_far[current] + haversine(index_to_coord[current], index_to_coord[next_node])

            if next_node not in cost_so_far or new_cost < cost_so_far[next_node]:
                cost_so_far[next_node] = new_cost
                heuristic = haversine(index_to_coord[next_node], goal_coord)
                if heuristic>=air_distance*0.7:
                    priority = new_cost + heuristic
                else :
                    priority = new_cost + heuristic*0.65
                open_set.put((priority, next_node))
                came_from[next_node] = current

                # Record the state for visualization
                current_path = reconstruct_path(came_from, start, current)
                state = {
                    "current": index_to_coord[current],
                    "open_set": [index_to_coord[node] for _, node in open_set.queue],
                    "path": [index_to_coord[node] for node in current_path]
                }
                states.append(state)

    # Save states to a JSON file
    with open('a_star_memory_states.json', 'w') as f:
        json.dump(states, f)

    final_path = reconstruct_path(came_from, start, goal)
    return [index_to_coord[node] for node in final_path], cost_so_far.get(goal, float('inf')), node_seen_count

In [139]:
memory_limit =100
path, cost, node_seen_count, explored_routes=UCS(start_coord=(21.0314522,105.8529442,),goal_coord=( 21.0072383, 105.8348649),adjacencies=adjacencies,coord_to_index=coord_to_index,index_to_coord=index_to_coord)
path, cost, node_seen_count, explored_routes=a_star(start_coord=(21.0314522,105.8529442,),goal_coord=( 21.0072383, 105.8348649),adjacencies=adjacencies,coord_to_index=coord_to_index,index_to_coord=index_to_coord)
path, cost, node_seen_count, explored_routes=a_star_plus(start_coord=(21.0314522,105.8529442,),goal_coord=( 21.0072383, 105.8348649),adjacencies=adjacencies,coord_to_index=coord_to_index,index_to_coord=index_to_coord)
path, cost, node_seen_count,explored_routes=a_star_memory_bounded(start_coord=(21.0314522,105.8529442,),goal_coord=( 21.0072383, 105.8348649),adjacencies=adjacencies,coord_to_index=coord_to_index,index_to_coord=index_to_coord,memory_limit=memory_limit)

In [140]:
path, cost, node_seen_count=a_star_memory_bounded_2(start_coord=(21.0314522,105.8529442,),goal_coord=( 21.0072383, 105.8348649),adjacencies=adjacencies,coord_to_index=coord_to_index,index_to_coord=index_to_coord,memory_limit=memory_limit)

In [141]:
path, cost, node_seen_count=a_star_2(start_coord=(21.0314522,105.8529442),goal_coord=( 21.0072383, 105.8348649),adjacencies=adjacencies,coord_to_index=coord_to_index,index_to_coord=index_to_coord)

In [142]:
path, cost, node_seen_count=a_star_plus_2(start_coord=(21.0314522,105.8529442,),goal_coord=( 21.0072383, 105.8348649),adjacencies=adjacencies,coord_to_index=coord_to_index,index_to_coord=index_to_coord)

KeyboardInterrupt: 

In [None]:
path, cost, node_seen_count=ucs_2(start_coord=(21.0314522,105.8529442,),goal_coord=( 21.0072383, 105.8348649),adjacencies=adjacencies,coord_to_index=coord_to_index,index_to_coord=index_to_coord)