### Environment

In [1]:
import arcpy
import heapq 
import math
from collections import defaultdict

arcpy.env.workspace = r"D:\sem5\pag\lab3\lab3\lab3.gdb"
arcpy.env.overwriteOutput = True

### Define file paths

In [22]:
folder = r"D:\sem5\pag\lab3"

roads_shp = folder + r"\SKJZ"

vertices_file = folder + r"\vertices.txt"
edges_file = folder + r"\edges.txt"

path_file = folder + r"\path.txt"

visited_vertices = folder + r"\visited_vertices.txt"
roads_visited = folder + r"\roads_visited.txt"


In [43]:
# Merge the two road shapefiles
arcpy.management.Merge(
    inputs="L4_1_BDOT10k__OT_SKJZ_L;L4_2_BDOT10k__OT_SKJZ_L",
    output=roads_shp,
    field_mappings='katZarzadz "katZarzadz" true true false 254 Text 0 0,First,#,L4_1_BDOT10k__OT_SKJZ_L,katZarzadz,0,253;klasaDrogi "klasaDrogi" true true false 254 Text 0 0,First,#,L4_1_BDOT10k__OT_SKJZ_L,klasaDrogi,0,253;materialNa "materialNa" true true false 254 Text 0 0,First,#,L4_1_BDOT10k__OT_SKJZ_L,materialNa,0,253;liczbaJezd "liczbaJezd" true true false 5 Long 0 0,First,#,L4_1_BDOT10k__OT_SKJZ_L,liczbaJezd,-1,-1;szerNawier "szerNawier" true true false 19 Double 0 0,First,#,L4_1_BDOT10k__OT_SKJZ_L,szerNawier,-1,-1;szerKorony "szerKorony" true true false 19 Double 0 0,First,#,L4_1_BDOT10k__OT_SKJZ_L,szerKorony,-1,-1;liczbaPaso "liczbaPaso" true true false 5 Long 0 0,First,#,L4_1_BDOT10k__OT_SKJZ_L,liczbaPaso,-1,-1;ulica "ulica" true true false 4 Short 0 0,First,#,L4_1_BDOT10k__OT_SKJZ_L,ulica,-1,-1',
    add_source="NO_SOURCE_INFO"
)

### Define the start and end vertex

In [39]:
start = 566
end = 7063

# Make graph from a shapefile
This code takes a shapefile file containing a road network and processes it producing a vertcies and edges file.\
**vertex**: id, x, y, edges \
**edge**: id, from, to, road_id, length, speed

In [21]:
vertices = {}
edges = []

# Create vertex if one doesn't exist with the same coords
def add_vertex(x, y, vertex_id):
    if (x, y) not in vertices:
        vertices[(x, y)] = {'id': vertex_id, 'x': x, 'y': y, 'edges': []}
    return vertices[(x, y)]['id']

def speed(klasa):
    kmh2ms = 1000 / 3600
    match klasa:
        case 'A':
            return 140 * kmh2ms
        case 'S':
            return 120 * kmh2ms
        case 'GP':
            return 70 * kmh2ms
        case 'G':
            return 60 * kmh2ms
        case 'Z':
            return 50 * kmh2ms
        case 'L':
            return 40 * kmh2ms
        case 'D':
            return 20 * kmh2ms
        case 'I':
            return 20 * kmh2ms
    return 0

# Read road shp
with arcpy.da.SearchCursor(roads_shp, ["SHAPE@", "FID", "klasaDrogi"]) as cursor:
    vertex_id = 0
    edge_id = 0

    for row in cursor:
        line = row[0]
        FID = row[1]
        klasa = row[2]
        speedms = speed(klasa)

        
        # read start and end point, round to one meter
        start_point = line.firstPoint
        end_point = line.lastPoint

        start_point = (round(start_point.X, 0), round(start_point.Y, 0))
        end_point = (round(end_point.X, 0), round(end_point.Y, 0))
        
        start_vertex = add_vertex(start_point[0], start_point[1], vertex_id)
        vertex_id += 1 if start_vertex == vertex_id else 0

        end_vertex = add_vertex(end_point[0], end_point[1], vertex_id)
        vertex_id += 1 if end_vertex == vertex_id else 0

        # add edges
        edges.append({
            "id": edge_id,
            "from": start_vertex,
            "to": end_vertex,
            "road_id": FID,
            "length": line.length,
            "speed": speedms
        })

        edge_id += 1
        
        # add edges to the vertex file
        vertices[start_point]['edges'].append(edge_id)
        vertices[end_point]['edges'].append(edge_id)


# Write vertices
with open(vertices_file, "w") as vf:
    vf.write("id\tx\ty\tedges\n")
    for v in vertices.values():
        vf.write(f"{v['id']}\t{v['x']}\t{v['y']}\t{','.join(map(str, v['edges']))}\n")

# Write edges
with open(edges_file, "w") as ef:
    ef.write("id\tfrom\tto\troad_id\tlength\tspeed\n")
    for edge in edges:
        ef.write(f"{edge['id']}\t{edge['from']}\t{edge['to']}\t{edge['road_id']}\t{edge['length']}\t{edge['speed']}\n")

# print("Vertex count:", len(vertices))
# print("Edge count:", len(edges))


### Display all vertices 

In [13]:
# Makes points from the vertex.txt file
arcpy.management.XYTableToPoint(
    in_table=vertices_file,
    out_feature_class=r"vertices",
    x_field="x",
    y_field="y",
    z_field=None,
    coordinate_system='PROJCS["ETRS_1989_UWPP_1992",GEOGCS["GCS_ETRS_1989",DATUM["D_ETRS_1989",SPHEROID["GRS_1980",6378137.0,298.257222101]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]],PROJECTION["Gauss_Kruger"],PARAMETER["False_Easting",500000.0],PARAMETER["False_Northing",-5300000.0],PARAMETER["Central_Meridian",19.0],PARAMETER["Scale_Factor",0.9993],PARAMETER["Latitude_Of_Origin",0.0],UNIT["Meter",1.0]];-5119200 -15295100 10000;-100000 10000;-100000 10000;0.001;0.001;0.001;IsHighPrecision'
)

# A* Path Finder

In [44]:
def read_graph_undirected(filename):
    f = open(filename)
    g = defaultdict(list)
    road_ids = {}
    road_lengths = {}
    road_speeds = {}
    
    f.readline()
    for line in f:
        e = line.split()
        from_vertex, to_vertex, road_id = int(e[1]), int(e[2]), int(e[3])
        length, speed = float(e[4]), float(e[5])

        # store neighbors and length for each direction
        g[from_vertex].append((to_vertex, length))
        g[to_vertex].append((from_vertex, length))

        # store road ID and length for each direction (undirected graph)
        road_ids[(from_vertex, to_vertex)] = road_id
        road_ids[(to_vertex, from_vertex)] = road_id
        road_lengths[(from_vertex, to_vertex)] = length
        road_lengths[(to_vertex, from_vertex)] = length
        road_speeds[(from_vertex, to_vertex)] = speed
        road_speeds[(to_vertex, from_vertex)] = speed
        
    return g, road_ids, road_lengths, road_speeds

def retrieve_path(prev, a, b, road_ids):
    path = [b]
    road_path = []
    while b != a:
        b = prev[b]
        path.append(b)
        road_path.append(road_ids[(b, path[-2])])
    path.reverse()
    road_path.reverse()
    return path, road_path

# estimate euclidian distance from one node to another
def heuristic(v, goal, vertices):
    vx, vy = vertices[v]['x'], vertices[v]['y']
    gx, gy = vertices[goal]['x'], vertices[goal]['y']
    return math.sqrt((vx - gx) ** 2 + (vy - gy) ** 2)

# A* pathfinding algorithm
def a_star_path(graph, road_ids, vertices, start, end):
    open_set = [] # priority queue
    heapq.heappush(open_set, (0, start))  # (f-score, vertex)
    g_score = {vertex: float('inf') for vertex in graph}
    g_score[start] = 0
    prev = {}

    while open_set:
        # get the node with best f-score (lowest cost)
        _, current = heapq.heappop(open_set)

        if current == end:
            return retrieve_path(prev, start, end, road_ids)

        for neighbor, length in graph[current]:
            tentative_g_score = g_score[current] + length
            if tentative_g_score < g_score[neighbor]:
                prev[neighbor] = current
                g_score[neighbor] = tentative_g_score
                f_score = g_score[neighbor] + heuristic(neighbor, end, vertices)
                heapq.heappush(open_set, (f_score, neighbor))

    return None


g, road_ids, road_lengths, road_speeds = read_graph_undirected(edges_file)

vertices_dict = {}
with open(vertices_file, 'r') as vf:
    vf.readline()
    for line in vf:
        parts = line.split()
        v_id = int(parts[0])
        x, y = float(parts[1]), float(parts[2])
        vertices_dict[v_id] = {'x': x, 'y': y}

path, road_path = a_star_path(g, road_ids, vertices_dict, start, end)

print(path)
print(road_path)

# count total length and time
def convert_time(total_seconds):
    hours = total_seconds // 3600
    minutes = (total_seconds % 3600) // 60
    seconds = total_seconds % 60
    return f"{int(hours)}h {int(minutes)}min {int(seconds)}sec"

def len_time(path):
    total_length = 0
    total_time = 0

    for i in range(len(path) - 1):
        from_vertex = path[i]
        to_vertex = path[i + 1]
        
        segment_length = road_lengths[(from_vertex, to_vertex)]
        segment_speed = road_speeds[(from_vertex, to_vertex)]
        
        total_length += segment_length
        total_time += segment_length / segment_speed
    
    return total_length, total_time

total_length, total_time = len_time(path)
formatted_time = convert_time(total_time)

print("Total length of route:", round(total_length,2), "meters")
print("Total time:", formatted_time)

[566, 585, 586, 587, 8433, 7549, 7552, 1593, 8424, 8444, 8472, 8476, 5144, 7751, 3158, 7225, 7754, 7755, 8478, 7063]
[8872, 427, 428, 8903, 8902, 6929, 11268, 8882, 9036, 9037, 9034, 9038, 9039, 7283, 7284, 7282, 7285, 9040, 9041]
Total length of route: 1428.18 meters
Total time: 0h 2min 15sec


### Save and display the vertices and edges

In [42]:
# find the vertices in the path and write the coords to the file
with open(vertices_file, 'r') as f:
    vertices = f.readlines()
    vertices = [x.strip() for x in vertices]
    path = [vertices[i+2].split()[0] for i in path]

# write the path to a file
with open(visited_vertices, 'w') as f:
    f.write("id\tx\ty\tedges\n")
    for i in path:
        f.write(vertices[int(i)] + '\n')

# write the road path to a file
with open(roads_visited, 'w') as f:
    f.write("id\tx\ty\tedges\n")
    for i in road_path:
        f.write(str(i) + '\n')

arcpy.management.XYTableToPoint(
    in_table=visited_vertices,
    out_feature_class=r"visited_vertices",
    x_field="x",
    y_field="y",
    z_field=None,
    coordinate_system='PROJCS["ETRS_1989_UWPP_1992",GEOGCS["GCS_ETRS_1989",DATUM["D_ETRS_1989",SPHEROID["GRS_1980",6378137.0,298.257222101]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]],PROJECTION["Gauss_Kruger"],PARAMETER["False_Easting",500000.0],PARAMETER["False_Northing",-5300000.0],PARAMETER["Central_Meridian",19.0],PARAMETER["Scale_Factor",0.9993],PARAMETER["Latitude_Of_Origin",0.0],UNIT["Meter",1.0]];-5119200 -15295100 10000;-100000 10000;-100000 10000;0.001;0.001;0.001;IsHighPrecision'
)

def create_sql_expression(road_path):
    sql_expression = "FID = " + str(road_path[0])
    for i in road_path[1:]:
        sql_expression += " Or FID = " + str(i)

    return sql_expression

sql_expression = create_sql_expression(road_path)

arcpy.analysis.Select(
    in_features="SKJZ",
    out_feature_class=r"D:\sem5\pag\lab3\lab3\lab3.gdb\visited_roads",
    where_clause=sql_expression
)
