## TÖL403G Lokaverkefni

In [12]:
import random, time, math, folium
from typing import NamedTuple
from heapdict import heapdict
from ast import literal_eval
from copy import deepcopy

Byrjum á því að útfæra einfalda gagnagrind fyrir net. Þetta mun halda utan um upplýsingarnar í skránum ásamt því að gera okkur kleift að útfæra aðferðir sem munu vera gagnlegar síðar.

In [44]:
class Node(NamedTuple):
    id: int
    x_coord: float
    y_coord: float
    primary: bool

class Edge(NamedTuple):
    node_id_from: int
    node_id_to: int
    length: float
    name: list[str]


class Graph:

    def __init__(self):
        self.nodes: dict[int, Node] = {}
        self.edges: dict[tuple[int, int], list[Edge]] = {}
        self.adj_list: dict[int, list[int]] = {}

    def add_node(self, node: Node) -> None:
        id = node.id
        if id not in self.nodes:
            self.nodes[id] = node

    def add_edge(self, edge: Edge) -> None:
        id_from = edge.node_id_from
        id_to = edge.node_id_to
        if id_from in self.nodes and id_to in self.nodes:
            self.edges.setdefault((id_from, id_to), []).append(edge)
            self.adj_list.setdefault(id_from, []).append(id_to)
        else:
            print(f'skipped edge missing node(s) {id_from} -> {id_to}')
    

def reverse_graph(g: Graph) -> Graph:
    reversed_graph = Graph()
    reversed_graph.nodes = g.nodes.copy()

    for _, edges in g.edges.items(): # Could optimise by assignin pointers instead of creating new edge obj
        for edge in edges:
            reversed_edge = Edge(
                node_id_from = edge.node_id_to,
                node_id_to = edge.node_id_from,
                length = edge.length,
                name = edge.name
            )
            reversed_graph.add_edge(reversed_edge)
    
    return reversed_graph
    

**2.3.1 Þáttun(*)**

Lesið inn netið úr skránum sem eru gefnar, nodes.tsv og edges.tsv. Í skránni nodes.tsv
eru hnútar með auðkenni (id), hnit (x, y) og hvort þeir séu á aðalvegi (primary). Í skránni
edges.tsv eru leggi frá hnúti u til hnúts v með lengd/length, mæld í metrum, og nafn
(name).

In [4]:
graph = Graph()

with open('nodes.tsv', 'r', encoding='utf-8') as file:
    next(file) # skip header
    for line in file:
        parts = line.strip().split('\t')
        node = Node(
            id = int(parts[0]),
            x_coord = float(parts[1]),
            y_coord = float(parts[2]),
            primary = parts[3].lower() == 'true'
        )
        graph.add_node(node)

def parse_edge_name(name: str) -> list[str]:
    name = name.strip()
    if not name:
        return []
    
    if name.startswith('[') and name.endswith(']'):
        try:
            result = literal_eval(name)
            if isinstance(result, list[str]):
                return result
            else:
                return [str(result)]
        except Exception:
            return [name]
    return [name]

with open('edges.tsv', 'r', encoding='utf-8') as file:
    next(file) # skip header
    for line in file:
        parts = line.strip().split('\t')
        edge = Edge(
            node_id_from = int(parts[0]),
            node_id_to = int(parts[1]),
            length = float(parts[2]),
            name = parse_edge_name(parts[3]) if len(parts) > 3 else []
        )
        graph.add_edge(edge)

***2.3.2 Leit (⋆⋆)***

Ef við setjum hleðslustöðvar á hnúta $v_1, . . . , v_k$ þá er hægt að nota reikniriti Dijkstra til að
finna stystu fjarlægð frá hverjum hnúti $u$ í hleðslustöð $v_i$. Útfærið reikniritið sem tekur
inn lista af lokahnútum og reiknar fjarlægðir frá öllum hnútum í netinu. Athugið að netið
er stefnt net.

In [5]:

def multisource_dijkstra(g: Graph, source_ids: list[int], target_id: int = None) -> tuple[dict[int, float], dict[int, int]]:
    if not g.nodes:
        raise ValueError('Graph is empty')
    for s_id in source_ids:
        if s_id not in g.nodes:
            raise ValueError(f'source id: {s_id} not in graph')
    if target_id is not None and target_id not in g.nodes:
        raise ValueError(f'target id: {target_id} not in graph')

    INF = float('inf')
    dist = {}
    prev = {}
    Q = heapdict()

    for s_id in source_ids:
        dist[s_id] = 0
        Q[s_id] = 0

    for v in g.nodes:
        if v not in Q:
            dist[v] = INF
            prev[v] = None
            Q[v] = INF

    while Q:
        u, _ = Q.popitem()
        if target_id is not None and u == target_id:
            break

        for neighbour in g.adj_list.get(u, []):
            edge_len = min([edge.length for edge in g.edges.get((u, neighbour), [])]) # sometimes multiple edges, we will always choose the min(length) edge
            if neighbour in Q:
                alt = dist[u] + edge_len
                if alt < dist[neighbour]:
                    dist[neighbour] = alt
                    prev[neighbour] = u
                    Q[neighbour] = alt

    return (dist, prev)

In [None]:
def Dijkstra(g: Graph, source_id: int, target_id: int = None) -> tuple[dict[int, float], dict[int, int]]:
    if not g.nodes:
        raise ValueError('Graph is empty')
    if source_id not in g.nodes:
        raise ValueError(f'source id: {source_id} not in graph')
    if target_id is not None and target_id not in g.nodes:
        raise ValueError(f'target id: {target_id} not in graph')

    INF = float('inf')
    dist = {}
    prev = {}
    Q = heapdict()
    dist[source_id] = 0
    Q[source_id] = dist[source_id]
    for v, _ in g.nodes.items():
        if v != source_id:
            dist[v] = INF
            prev[v] = None
            Q[v] = dist[v]

    while Q:
        u, _ = Q.popitem()
        if target_id is not None and u == target_id:
            break

        for neighbour in g.adj_list.get(u, []):
            for edge in g.edges.get((u, neighbour), []):
                if neighbour in Q:
                    alt = dist[u] + edge.length
                    if alt < dist[neighbour]:
                        dist[neighbour] = alt
                        prev[neighbour] = u
                        Q[neighbour] = dist[neighbour]

    return (dist, prev)


def reverse_graph(g: Graph) -> Graph:
    reversed_graph = Graph()
    reversed_graph.nodes = g.nodes.copy()

    for _, edges in g.edges.items():
        for edge in edges:
            reversed_edge = Edge(
                node_id_from = edge.node_id_to,
                node_id_to = edge.node_id_from,
                length = edge.length,
                name = edge.name
            )
            reversed_graph.add_edge(reversed_edge)
    
    return reversed_graph

***2.3.3 Framsetning (⋆)***

Setjið fimm hleðslustöðvar í netið og sýnið stystu leið fyrir fimm punkta og teiknið upp á
kort. Tékkið ykkur af með því að bera saman leiðina sem er fundin og fjarlægðina miðað
við kortavefi eins og t.d. Google Maps.

In [8]:
PRIMARY_NODE_IDS = [node_id for node_id, node in graph.nodes.items() if node.primary]
FIVE_CHARGING_IDS = random.sample(PRIMARY_NODE_IDS, 5)

NON_CHARGING_IDS = [node_id for node_id in graph.nodes if node_id not in FIVE_CHARGING_IDS]
FIVE_NODE_IDS = random.sample(NON_CHARGING_IDS, 5)

(dist, prev) = multisource_dijkstra(reverse_graph(graph), FIVE_CHARGING_IDS)

def reconstruct_path(prev: dict[int, int], start_id: int) -> list[int]:
    path = []
    current = start_id
    while current is not None:
        path.append(current)
        current = prev.get(current)
    return path[::-1]

first_node = graph.nodes[FIVE_NODE_IDS[0]]
m = folium.Map(location=[first_node.y_coord, first_node.x_coord], zoom_start=13)

for station_id in FIVE_CHARGING_IDS:
    node = graph.nodes[station_id]
    folium.Marker(
        location=[node.y_coord, node.x_coord],
        popup=f"Charging Station {station_id}",
        icon=folium.Icon(color='green', icon='bolt')
    ).add_to(m)

for node_id in FIVE_NODE_IDS:
    path_ids = reconstruct_path(prev, node_id)
    path_coords = [[graph.nodes[n_id].y_coord, graph.nodes[n_id].x_coord] for n_id in path_ids]

    folium.PolyLine(path_coords, color='blue', weight=3, opacity=0.8).add_to(m)

    node = graph.nodes[node_id]
    folium.Marker(
        location=[node.y_coord, node.x_coord],
        popup=f"Start Node {node_id}",
        icon=folium.Icon(color='red', icon='map-marker')
    ).add_to(m)

m
# m.save("paths_map.html")  # to save it

***2.3.4 Tímamælingar (⋆)***

Mælið tímann sem reiknirit Dijkstra tekur að reikna allar fjarlægðir í netinu með fimm
hleðslustöðvum.

In [None]:
start_time = time.time()
reversed_graph = reverse_graph(graph)
multisource_dijkstra(reversed_graph, FIVE_CHARGING_IDS)
end_time = time.time()
elapsed_time = end_time - start_time
print(f'{elapsed_time:.3f} sec')

0.242 sec


***2.3.5 $A^*$ reikniritið (⋆⋆)***

Útfærið $A^*$ reikniritið sem tekur inn lista af lokahnútum og reiknar fjarlægðir frá öll-
um hnútum í netinu. Sem neðra mat á fjarlægð á milli hnútanna má taka $d(u, v) =\sqrt{(x_u − x_v )^2 + (y_u − y_v )^2}$, þ.e. beina loftlínu milli punktanna. Mælið tíma og berið saman
við reiknirit Dijkstra.

In [None]:
def h(u: Node, v: Node) -> float:
    (x_u, y_u) = (u.x_coord, u.y_coord)
    (x_v, y_v) = (v.x_coord, v.y_coord)
    return math.sqrt((x_u - x_v)**2 + (y_u - y_v)**2)

def multitarget_A_star_all_dists(g: Graph, targets: list[Node]) -> tuple[dict[int, float], dict[int, int]]:
    if not g.nodes:
        raise ValueError('Graph is empty')
    for target in targets:
        if target.id not in g.nodes:
            raise ValueError(f'source node with id: {target.id} not in graph')

    reversed_graph = reverse_graph(g)
    INF = float('inf')
    minHeap = heapdict()
    gScore = {node_id: INF for node_id in g.nodes}
    prev = {}
    closed = set()
    H = {}
    for node_id, node in g.nodes.items(): # precompute lowest euciclidian values
        min_h = min(h(t, node) for t in targets)
        H[node_id] = min_h

    for target in targets:
        gScore[target.id] = 0 # g(n): sum of length from target to node
        minHeap[target.id] = 0

    while minHeap:
        curr_id, _ = minHeap.popitem()

        if curr_id in closed:
            continue
        closed.add(curr_id)

        for neighbour in reversed_graph.adj_list.get(curr_id, []):
            edge_len = min(edge.length for edge in reversed_graph.edges.get((curr_id, neighbour), [])) # sometimes multiple edges, we will always choose the min(length) edge
            tentative_gScore = gScore[curr_id] + edge_len
            if tentative_gScore < gScore[neighbour]:
                gScore[neighbour] = tentative_gScore
                minHeap[neighbour] = tentative_gScore + H[neighbour] #f(n) = g(n) + h(n)
                prev[neighbour] = curr_id

    return (gScore, prev)

In [36]:
FIVE_CHARGING_NODES = [graph.nodes[id] for id in FIVE_CHARGING_IDS]
start = time.perf_counter()
a_star_result = multitarget_A_star_all_dists(graph, FIVE_CHARGING_NODES)
a_star_time = time.perf_counter() - start

start = time.perf_counter()
reversed_graph = reverse_graph(graph)
dijkstra_result = multisource_dijkstra(reversed_graph, FIVE_CHARGING_IDS)
dijkstra_time = time.perf_counter() - start

print(f"A* took: {a_star_time:.4f} seconds")
print(f"Dijkstra took: {dijkstra_time:.4f} seconds")

A* took: 0.1301 seconds
Dijkstra took: 0.1660 seconds


***2.3.6 Staðsetning hleðslustöðva (⋆⋆)***

Ef við setjum $k$ hleðslustöðvar í hnúta $v_1, . . . , v_k$ þá látum við markfallið vera:
$$F(v_1, ... , v_k) \sum_{v \in V} \min_{i=1, ... ,k} d(u, v_i)$$
þ.e. fyrir hvern hnút í netinu reiknum við stystu fjarlægð frá honum til næstu hleðslustöðvar
og leggjum saman yfir alla hnúta í netinu. Finnið bestu lausn fyrir k = 1, með því að prófa alla hnúta sem hægt er að setja
hleðslustöð í og veljið þann sem gefur minnsta markfall. Athugið að eingöngu þeir hnútar
sem eru merktir sem primary geta verið hleðslustöðvar.

In [14]:
PRIMARY_NODES = [node for node in graph.nodes.values() if node.primary]

def objective_function(g: Graph, V: list[Node]) -> list[tuple[float, int]]:
    reversed_graph = reverse_graph(g)
    sums = []
    INF = float('inf')
    for v in V:
        (dist, _) = Dijkstra(reversed_graph, v.id)
        curr_sum = sum(d for d in dist.values() if d != INF) # Unreachable if d == inf
        sums.append((curr_sum, v.id))
    return min([s for s in sums if s[0] > 0], key = lambda x: x[0])

(min_sum, node_id) = objective_function(graph, PRIMARY_NODES)
print(f'Solution for k = 1: is at node with id: {node_id} and has min sum = {min_sum}')


Solution for k = 1: is at node with id: 34827739 and has min sum = 71792102.5791305


***2.3.7 Gráðug reiknirit (⋆⋆)***

Útfærið gráðugt reiknirit sem leitar að bestu lausn fyrir $k = 2, . . . , 10$ með því að leysa
vandamálið fyrir $k−1$ hleðslustöðvum og bæta þá við þann hnút sem gefur minnsta markfall,
miðað við að ekki sé hægt að breyta $v_1, . . . , v_{k−1}$.
Sýnið á korti hvaða hleðslustöðvar eru valdar fyrir $k = 10$ og mælið tímann sem reikniritið
tekur.

In [42]:
def greedy(k: int, g: Graph, V: list[Node]) -> set[int]:
    # Solve for k = 1 and cache results
    reversed_graph = reverse_graph(graph)
    sums = []
    INF = float('inf')
    results = {}
    for v in V:
        (dist, _) = Dijkstra(reversed_graph, v.id)
        results[v.id] = dist
        curr_sum = sum(d for d in dist.values() if d != INF)
        sums.append((curr_sum, v.id))
    v_1 = min([s for s in sums if s[0] > 0], key = lambda x: x[0])[1]

    V_k = {v_1}

    for _ in range(k - 1):
        (min_sum, v_i) = (INF, None)
        for v in V:
            if v.id in V_k:
                continue

            curr_sum = 0
            for u in g.nodes:
                candidates_dists = [
                        results[v_k].get(u, INF)
                        for v_k in V_k | {v.id}
                        if results[v_k][u] != INF
                    ]
                if candidates_dists:
                    curr_min = min(candidates_dists)
                    curr_sum += curr_min
                
            if curr_sum < min_sum and curr_sum > 0: # Some nodes are unreachable and thus useless
                (min_sum, v_i) = (curr_sum, v.id)
            
        V_k.add(v_i)
    
    return V_k

In [45]:
start_time = time.time()
sol = greedy(10, graph, PRIMARY_NODES)
end_time = time.time()
elapsed_time = end_time - start_time
print(f'Greedy altorithm wit k = 10 took:{elapsed_time / 60:.2f} min')

Greedy altorithm wit k = 10 took:5.00 min


In [46]:
best_stations = list(greedy_solution)
def show_nodes_on_map(graph: Graph, node_ids: list[int], zoom: int = 13, color: str = "blue") -> folium.Map:
    if not node_ids:
        raise ValueError("No node IDs provided.")

    first_node = graph.nodes[node_ids[0]]
    m = folium.Map(location=[first_node.y_coord, first_node.x_coord], zoom_start=zoom)

    for node_id in node_ids:
        node = graph.nodes[node_id]
        folium.Marker(
            location=[node.y_coord, node.x_coord],
            popup=f"Node {node_id}",
            icon=folium.Icon(color=color)
        ).add_to(m)

    return m

show_nodes_on_map(graph, best_stations)

***2.3.8 Skárri gráðug reiknirit (⋆⋆)***

Gráðuga reikniritið á það til að mála sig út í horn með því að velja lélegan fyrsta hnút.
Breytið leitinni þannig að þið veljið handahófskenndan fyrsta hnút og farið endurkvæmt í
tilfellin $k = 2, . . . , 10.$ Í hverju undirtilfelli finnið þið 2 bestu hnútana sem koma til greina
en eru langt frá hvor öðrum og prófið endurkvæmt alla möguleika. Haldið utan um bestu
lausnina sem finnst fyrir nokkur hanndahófskennda upphafspunkta og sýnið bestu lausn á
korti. Hve mikinn tíma tekur reikniritið ykkar?

In [None]:
# this brutal force impl will take forever, need to optomise calculations, but will return correct solution for small enough input
def better_greedy_brute_force(k: int, g: Graph, V: list[Node]) -> set[tuple[int, float]]:
    def helper(V_k: set[int], min_score: float) -> tuple[set[int], float]: # (V_k, score)
        if len(V_k) == k:
            return (V_k, min_score)

        (min_i_score, min_v_i) = (INF, None)
        (min_j_score, min_j_i) = (INF, None)
        for i in range(n):
            v_i = V[i]
            v_i_id = v_i.id
            for j in range(i + 1, n):
                v_j = V[j]
                v_j_id = v_j.id
                if v_i_id in V_k or v_j_id in V_k:
                    continue
            
            curr_i_score = 0
            curr_j_score = 0
            curr_scalar = (0.5 * h(v_i, v_j))
            for u in g.nodes:
                candidates_i_scores = [
                    results[v_k].get(u, INF) - curr_scalar # We deduct by this scalar because we also care about the distance between the nodes
                    for v_k in V_k | {v_i_id}
                    if results[v_k][u] != INF
                ]

                candidates_j_scores = [
                    results[v_k].get(u, INF) - curr_scalar
                    for v_k in V_k | {v_j_id}
                    if results[v_k][u] != INF
                ]

                if candidates_i_scores and candidates_j_scores:
                    curr_i_min = min(candidates_i_scores)
                    curr_i_score += curr_i_min
                    curr_j_min = min(candidates_j_scores)
                    curr_j_score += curr_j_min
                
            if curr_i_score < min_i_score:
                (min_i_score, min_v_i) = (curr_i_score, v_i_id)
            
            if curr_j_score < min_j_score:
                (min_j_score, min_j_i) = (curr_j_score, v_j_id)
                
        return min(helper(V_k | {min_v_i}, min_i_score), helper(V_k | {min_j_i}, min_j_score), key = lambda x: x[1])

    reversed_graph = reverse_graph(graph)
    n = len(V)
    INF = float('inf')
    results = {}
    for v in V:
        (dist, _) = Dijkstra(reversed_graph, v.id)
        results[v.id] = dist
    
    first = random.choice(V).id
    return helper({first}, 0) #{(V_k, total_score)}

In [None]:
def better_greedy(k: int, g: Graph, V: list[Node]):

    return 0