In [None]:
import sys
import random
import time
import enum
import threading
from typing import Dict, List, Set, Tuple, Optional
from dataclasses import dataclass

# Global mutex for thread synchronization
global_mutex = threading.Lock()

@dataclass
class Edge:
    src: int
    dest: int
    weight: int = 1

class Graph:
    def __init__(self):
        self.num_nodes: int = 0
        self.edges: List[Edge] = []
        self.adj: Dict[int, List[Tuple[int, int]]] = {}
        self.node_to_partition: List[int] = []

    def compute_adjacency_list(self):
        self.adj.clear()
        for e in self.edges:
            if e.src not in self.adj:
                self.adj[e.src] = []
            self.adj[e.src].append((e.dest, e.weight))

    def identify_boundary_nodes(self, my_partition: int) -> Set[int]:
        boundary = set()
        for node, neighbors in self.adj.items():
            if self.node_to_partition[node] != my_partition:
                continue
            for neighbor, _ in neighbors:
                if neighbor < len(self.node_to_partition) and self.node_to_partition[neighbor] != my_partition:
                    boundary.add(node)
                    break
        return boundary

    def load_from_file(self, filename: str):
        with open(filename, 'r') as f:
            for line in f:
                if line.startswith('#'):
                    continue
                parts = line.strip().split()
                if len(parts) >= 2:
                    src = int(parts[0])
                    dest = int(parts[1])
                    self.edges.append(Edge(src, dest))

        all_nodes = set(e.src for e in self.edges).union(e.dest for e in self.edges)
        self.num_nodes = max(all_nodes) + 1 if all_nodes else 0
        self.node_to_partition = [i % 4 for i in range(self.num_nodes)]
        self.compute_adjacency_list()

class UpdateType(enum.Enum):
    INSERT = 1
    DELETE = 2

@dataclass
class EdgeUpdate:
    type: UpdateType
    src: int
    dest: int
    weight: int = 1

def process_updates_thread(graph: Graph, updates: List[EdgeUpdate], start_idx: int, end_idx: int, adjacency_changed: List[bool]):
    local_changed = False
    for i in range(start_idx, end_idx):
        upd = updates[i]
        if upd.src >= graph.num_nodes or upd.dest >= graph.num_nodes:
            with global_mutex:
                print(f"Warning: Ignoring invalid update {upd.src}->{upd.dest}", file=sys.stderr)
            continue

        with global_mutex:
            if upd.type == UpdateType.INSERT:
                exists = any(neigh == upd.dest for neigh, _ in graph.adj.get(upd.src, []))
                if not exists:
                    graph.edges.append(Edge(upd.src, upd.dest, upd.weight))
                    local_changed = True
            elif upd.type == UpdateType.DELETE:
                original_len = len(graph.adj.get(upd.src, []))
                graph.edges = [e for e in graph.edges if not (e.src == upd.src and e.dest == upd.dest)]
                if upd.src in graph.adj:
                    graph.adj[upd.src] = [(d, w) for d, w in graph.adj[upd.src] if d != upd.dest]
                    if len(graph.adj[upd.src]) < original_len:
                        local_changed = True

    if local_changed:
        with global_mutex:
            adjacency_changed[0] = True

def apply_updates(graph: Graph, updates: List[EdgeUpdate]):
    adjacency_changed = [False]
    num_threads = max(1, min(4, len(updates))) if len(updates) > 100 else 1  # Adjusting thread count based on updates
    chunk_size = (len(updates) + num_threads - 1) // num_threads
    threads = []

    for i in range(num_threads):
        start_idx = i * chunk_size
        end_idx = min(len(updates), (i + 1) * chunk_size)
        if start_idx >= len(updates):
            break
        t = threading.Thread(target=process_updates_thread, args=(graph, updates, start_idx, end_idx, adjacency_changed))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    if adjacency_changed[0]:
        graph.compute_adjacency_list()

def compute_optimal_delta(graph: Graph) -> int:
    if not graph.edges:
        return 1
    weights = [e.weight for e in graph.edges]
    avg_weight = sum(weights) / len(weights)
    return max(1, min(int(avg_weight), 100))  # Using average instead of median

def identify_affected_nodes(graph: Graph, updates: List[EdgeUpdate], current_dist: List[float]) -> Set[int]:
    affected = set()
    INF = float('inf')
    for upd in updates:
        affected.update([upd.src, upd.dest])
        if upd.type == UpdateType.INSERT:
            if current_dist[upd.src] != INF and current_dist[upd.src] + upd.weight < current_dist[upd.dest]:
                affected.add(upd.dest)
        elif upd.type == UpdateType.DELETE:
            affected.update(n for n, _ in graph.adj.get(upd.src, []))
            affected.update(n for n, _ in graph.adj.get(upd.dest, []))
    return affected

def process_bucket_nodes(graph: Graph, dist: List[float], nodes: Set[int], delta: int, next_buckets: Dict[int, Set[int]], process_light: bool):
    INF = float('inf')
    local_relaxed = {}

    for u in nodes:
        for v, weight in graph.adj.get(u, []):
            if (process_light and weight <= delta) or (not process_light and weight > delta):
                new_dist = dist[u] + weight
                if new_dist < dist[v]:
                    local_relaxed[v] = min(local_relaxed.get(v, INF), new_dist)

    with global_mutex:
        for v, new_dist in local_relaxed.items():
            if new_dist < dist[v]:
                dist[v] = new_dist
                bucket = int(new_dist // delta)
                if bucket not in next_buckets:
                    next_buckets[bucket] = set()
                next_buckets[bucket].add(v)

def delta_stepping_sssp(graph: Graph, dist: List[float], source: int, delta: int, affected_nodes: Optional[Set[int]] = None):
    INF = float('inf')
    if affected_nodes is None or not affected_nodes:
        dist[:] = [INF] * len(dist)
        dist[source] = 0
        affected_nodes = {source}

    max_bucket = 1000000
    buckets = [set() for _ in range(max_bucket)]
    for node in affected_nodes:
        if node < len(dist) and dist[node] != INF:
            b_idx = int(dist[node] // delta)
            buckets[b_idx].add(node)

    num_threads = max(1, min(4, len(affected_nodes)))  # Dynamic thread count based on affected nodes

    for i in range(max_bucket):
        while buckets[i]:
            current_nodes = buckets[i]
            buckets[i] = set()

            next_light, next_heavy = {}, {}

            node_chunks = [set() for _ in range(num_threads)]
            for idx, node in enumerate(current_nodes):
                node_chunks[idx % num_threads].add(node)

            # Light edges
            threads = []
            for chunk in node_chunks:
                if chunk:
                    t = threading.Thread(target=process_bucket_nodes, args=(graph, dist, chunk, delta, next_light, True))
                    threads.append(t)
                    t.start()
            for t in threads:
                t.join()

            for b_idx, nodes in next_light.items():
                if b_idx < len(buckets):
                    buckets[b_idx].update(nodes)

            # Heavy edges
            threads = []
            for chunk in node_chunks:
                if chunk:
                    t = threading.Thread(target=process_bucket_nodes, args=(graph, dist, chunk, delta, next_heavy, False))
                    threads.append(t)
                    t.start()
            for t in threads:
                t.join()

            for b_idx, nodes in next_heavy.items():
                if b_idx < len(buckets):
                    buckets[b_idx].update(nodes)

def main():
    graph = Graph()
    dataset_file = "/content/Dataset.txt"  # Change path as needed

    print(f"Loading graph from {dataset_file}...")
    start_time = time.time()
    graph.load_from_file(dataset_file)
    load_time = time.time() - start_time

    print(f"Graph loaded in {load_time:.2f} seconds with {graph.num_nodes} nodes and {len(graph.edges)} edges.")

    source_node = 0
    distances = [float('inf')] * graph.num_nodes

    delta = compute_optimal_delta(graph)
    print(f"Using delta value: {delta}")

    print("Running initial SSSP...")
    start_time = time.time()
    delta_stepping_sssp(graph, distances, source_node, delta)
    sssp_time = time.time() - start_time
    print(f"Initial SSSP completed in {sssp_time:.2f} seconds.")

    print("Sample distances from source node:")
    for i in range(min(10, graph.num_nodes)):
        dist_str = "INF" if distances[i] == float('inf') else str(distances[i])
        print(f"Node {i}: {dist_str}")

    num_updates = 500
    updates = []
    random.seed(42)
    for _ in range(num_updates):
        upd_type = UpdateType.INSERT if random.random() < 0.5 else UpdateType.DELETE
        src = random.randint(0, graph.num_nodes - 1)
        dest = random.randint(0, graph.num_nodes - 1)
        updates.append(EdgeUpdate(upd_type, src, dest))

    print(f"\nApplying {num_updates} updates...")
    start_time = time.time()
    apply_updates(graph, updates)
    update_time = time.time() - start_time
    print(f"Updates applied in {update_time:.2f} seconds.")

    affected_nodes = identify_affected_nodes(graph, updates, distances)
    print(f"Identified {len(affected_nodes)} affected nodes.")

    print("Running incremental SSSP...")
    start_time = time.time()
    delta_stepping_sssp(graph, distances, source_node, delta, affected_nodes)
    incremental_time = time.time() - start_time
    print(f"Incremental SSSP completed in {incremental_time:.2f} seconds.")
    print(f"Speedup factor (initial / incremental): {sssp_time / incremental_time:.2f}x")

    print("Updated sample distances:")
    for i in range(min(10, graph.num_nodes)):
        dist_str = "INF" if distances[i] == float('inf') else str(distances[i])
        print(f"Node {i}: {dist_str}")

if __name__ == "__main__":
    main()


Loading graph from /content/Dataset.txt...
Graph loaded in 26.85 seconds with 4847571 nodes and 7970363 edges.
Using delta value: 1
Running initial SSSP...
Initial SSSP completed in 8.62 seconds.
Sample distances from source node:
Node 0: 0
Node 1: 1
Node 2: 1
Node 3: 1
Node 4: 1
Node 5: 1
Node 6: 1
Node 7: 1
Node 8: 1
Node 9: 1

Applying 500 updates...
Updates applied in 122.19 seconds.
Identified 1254 affected nodes.
Running incremental SSSP...
Incremental SSSP completed in 1.92 seconds.
Speedup factor (initial / incremental): 4.49x
Updated sample distances:
Node 0: 0
Node 1: 1
Node 2: 1
Node 3: 1
Node 4: 1
Node 5: 1
Node 6: 1
Node 7: 1
Node 8: 1
Node 9: 1


In [None]:
import sys
import random
import time
import enum
import threading
from typing import Dict, List, Set, Tuple, Optional
from dataclasses import dataclass

# Global mutex for thread synchronization
global_mutex = threading.Lock()

@dataclass
class Edge:
    src: int
    dest: int
    weight: int = 1

class Graph:
    def __init__(self):
        self.num_nodes: int = 0
        self.edges: List[Edge] = []
        self.adj: Dict[int, List[Tuple[int, int]]] = {}
        self.node_to_partition: List[int] = []

    def compute_adjacency_list(self):
        self.adj.clear()
        for e in self.edges:
            if e.src not in self.adj:
                self.adj[e.src] = []
            self.adj[e.src].append((e.dest, e.weight))

    def identify_boundary_nodes(self, my_partition: int) -> Set[int]:
        boundary = set()
        for node, neighbors in self.adj.items():
            if self.node_to_partition[node] != my_partition:
                continue
            for neighbor, _ in neighbors:
                if neighbor < len(self.node_to_partition) and self.node_to_partition[neighbor] != my_partition:
                    boundary.add(node)
                    break
        return boundary

    def load_from_file(self, filename: str):
        with open(filename, 'r') as f:
            for line in f:
                if line.startswith('#'):
                    continue
                parts = line.strip().split()
                if len(parts) >= 2:
                    src = int(parts[0])
                    dest = int(parts[1])
                    self.edges.append(Edge(src, dest))

        all_nodes = set(e.src for e in self.edges).union(e.dest for e in self.edges)
        self.num_nodes = max(all_nodes) + 1 if all_nodes else 0
        self.node_to_partition = [i % 4 for i in range(self.num_nodes)]
        self.compute_adjacency_list()

class UpdateType(enum.Enum):
    INSERT = 1
    DELETE = 2

@dataclass
class EdgeUpdate:
    type: UpdateType
    src: int
    dest: int
    weight: int = 1

def process_updates_thread(graph: Graph, updates: List[EdgeUpdate], start_idx: int, end_idx: int, adjacency_changed: List[bool]):
    local_changed = False
    for i in range(start_idx, end_idx):
        upd = updates[i]
        if upd.src >= graph.num_nodes or upd.dest >= graph.num_nodes:
            with global_mutex:
                print(f"Warning: Ignoring invalid update {upd.src}->{upd.dest}", file=sys.stderr)
            continue

        with global_mutex:
            if upd.type == UpdateType.INSERT:
                exists = any(neigh == upd.dest for neigh, _ in graph.adj.get(upd.src, []))
                if not exists:
                    graph.edges.append(Edge(upd.src, upd.dest, upd.weight))
                    local_changed = True
            elif upd.type == UpdateType.DELETE:
                original_len = len(graph.adj.get(upd.src, []))
                graph.edges = [e for e in graph.edges if not (e.src == upd.src and e.dest == upd.dest)]
                if upd.src in graph.adj:
                    graph.adj[upd.src] = [(d, w) for d, w in graph.adj[upd.src] if d != upd.dest]
                    if len(graph.adj[upd.src]) < original_len:
                        local_changed = True

    if local_changed:
        with global_mutex:
            adjacency_changed[0] = True

def apply_updates(graph: Graph, updates: List[EdgeUpdate]):
    adjacency_changed = [False]
    num_threads = max(1, min(4, len(updates)))
    chunk_size = (len(updates) + num_threads - 1) // num_threads
    threads = []

    for i in range(num_threads):
        start_idx = i * chunk_size
        end_idx = min(len(updates), (i + 1) * chunk_size)
        if start_idx >= len(updates):
            break
        t = threading.Thread(target=process_updates_thread, args=(graph, updates, start_idx, end_idx, adjacency_changed))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    if adjacency_changed[0]:
        graph.compute_adjacency_list()

def compute_optimal_delta(graph: Graph) -> int:
    if not graph.edges:
        return 1
    weights = sorted(e.weight for e in graph.edges)
    median = weights[len(weights) // 2]
    return max(1, min(median, 100))

def identify_affected_nodes(graph: Graph, updates: List[EdgeUpdate], current_dist: List[float]) -> Set[int]:
    affected = set()
    INF = float('inf')
    for upd in updates:
        affected.update([upd.src, upd.dest])
        if upd.type == UpdateType.INSERT:
            if current_dist[upd.src] != INF and current_dist[upd.src] + upd.weight < current_dist[upd.dest]:
                affected.add(upd.dest)
        elif upd.type == UpdateType.DELETE:
            affected.update(n for n, _ in graph.adj.get(upd.src, []))
            affected.update(n for n, _ in graph.adj.get(upd.dest, []))
    return affected

def process_bucket_nodes(graph: Graph, dist: List[float], nodes: Set[int], delta: int, next_buckets: Dict[int, Set[int]], process_light: bool):
    INF = float('inf')
    local_relaxed = {}

    for u in nodes:
        for v, weight in graph.adj.get(u, []):
            if (process_light and weight <= delta) or (not process_light and weight > delta):
                new_dist = dist[u] + weight
                if new_dist < dist[v]:
                    local_relaxed[v] = min(local_relaxed.get(v, INF), new_dist)

    with global_mutex:
        for v, new_dist in local_relaxed.items():
            if new_dist < dist[v]:
                dist[v] = new_dist
                bucket = int(new_dist // delta)
                if bucket not in next_buckets:
                    next_buckets[bucket] = set()
                next_buckets[bucket].add(v)

def delta_stepping_sssp(graph: Graph, dist: List[float], source: int, delta: int, affected_nodes: Optional[Set[int]] = None):
    INF = float('inf')
    if affected_nodes is None or not affected_nodes:
        dist[:] = [INF] * len(dist)
        dist[source] = 0
        affected_nodes = {source}

    max_bucket = 1000000
    buckets = [set() for _ in range(max_bucket)]
    for node in affected_nodes:
        if node < len(dist) and dist[node] != INF:
            b_idx = int(dist[node] // delta)
            buckets[b_idx].add(node)

    num_threads = max(1, min(4, len(buckets)))

    for i in range(max_bucket):
        while buckets[i]:
            current_nodes = buckets[i]
            buckets[i] = set()

            next_light, next_heavy = {}, {}

            node_chunks = [set() for _ in range(num_threads)]
            for idx, node in enumerate(current_nodes):
                node_chunks[idx % num_threads].add(node)

            # Light edges
            threads = []
            for chunk in node_chunks:
                if chunk:
                    t = threading.Thread(target=process_bucket_nodes, args=(graph, dist, chunk, delta, next_light, True))
                    threads.append(t)
                    t.start()
            for t in threads:
                t.join()

            for b_idx, nodes in next_light.items():
                if b_idx < len(buckets):
                    buckets[b_idx].update(nodes)

            # Heavy edges
            threads = []
            for chunk in node_chunks:
                if chunk:
                    t = threading.Thread(target=process_bucket_nodes, args=(graph, dist, chunk, delta, next_heavy, False))
                    threads.append(t)
                    t.start()
            for t in threads:
                t.join()

            for b_idx, nodes in next_heavy.items():
                if b_idx < len(buckets):
                    buckets[b_idx].update(nodes)

def main():
    graph = Graph()
    dataset_file = "Dataset.txt"  # Change path as needed

    print(f"Loading graph from {dataset_file}...")
    start_time = time.time()
    graph.load_from_file(dataset_file)
    load_time = time.time() - start_time

    print(f"Graph loaded in {load_time:.2f} seconds with {graph.num_nodes} nodes and {len(graph.edges)} edges.")

    source_node = 0
    distances = [float('inf')] * graph.num_nodes

    delta = compute_optimal_delta(graph)
    print(f"Using delta value: {delta}")

    print("Running initial SSSP...")
    start_time = time.time()
    delta_stepping_sssp(graph, distances, source_node, delta)
    sssp_time = time.time() - start_time
    print(f"Initial SSSP completed in {sssp_time:.2f} seconds.")

    print("Sample distances from source node:")
    for i in range(min(10, graph.num_nodes)):
        dist_str = "INF" if distances[i] == float('inf') else str(distances[i])
        print(f"Node {i}: {dist_str}")

    num_updates = 100
    updates = []
    random.seed(42)
    for _ in range(num_updates):
        upd_type = UpdateType.INSERT if random.random() < 0.5 else UpdateType.DELETE
        src = random.randint(0, graph.num_nodes - 1)
        dest = random.randint(0, graph.num_nodes - 1)
        updates.append(EdgeUpdate(upd_type, src, dest))

    print(f"\nApplying {num_updates} updates...")
    start_time = time.time()
    apply_updates(graph, updates)
    update_time = time.time() - start_time
    print(f"Updates applied in {update_time:.2f} seconds.")

    affected_nodes = identify_affected_nodes(graph, updates, distances)
    print(f"Identified {len(affected_nodes)} affected nodes.")

    print("Running incremental SSSP...")
    start_time = time.time()
    delta_stepping_sssp(graph, distances, source_node, delta, affected_nodes)
    incremental_time = time.time() - start_time
    print(f"Incremental SSSP completed in {incremental_time:.2f} seconds.")
    print(f"Speedup factor (initial / incremental): {sssp_time / incremental_time:.2f}x")

    print("Updated sample distances:")
    for i in range(min(10, graph.num_nodes)):
        dist_str = "INF" if distances[i] == float('inf') else str(distances[i])
        print(f"Node {i}: {dist_str}")

if __name__ == "__main__":
    main()


Loading graph from Dataset.txt...
Graph loaded in 0.00 seconds with 50 nodes and 277 edges.
Using delta value: 1
Running initial SSSP...
Initial SSSP completed in 1.87 seconds.
Sample distances from source node:
Node 0: 0
Node 1: 1
Node 2: 1
Node 3: 1
Node 4: 1
Node 5: 1
Node 6: 1
Node 7: 1
Node 8: 1
Node 9: 1

Applying 100 updates...
Updates applied in 0.01 seconds.
Identified 50 affected nodes.
Running incremental SSSP...
Incremental SSSP completed in 2.07 seconds.
Speedup factor (initial / incremental): 0.91x
Updated sample distances:
Node 0: 0
Node 1: 1
Node 2: 1
Node 3: 1
Node 4: 1
Node 5: 1
Node 6: 1
Node 7: 1
Node 8: 1
Node 9: 1


In [17]:
pip install matplotlib memory_profiler


Collecting memory_profiler
  Downloading memory_profiler-0.61.0-py3-none-any.whl.metadata (20 kB)
Downloading memory_profiler-0.61.0-py3-none-any.whl (31 kB)
Installing collected packages: memory_profiler
Successfully installed memory_profiler-0.61.0


In [None]:
import sys
import random
import time
import enum
import threading
import logging
import cProfile
import pstats
from typing import Dict, List, Set, Tuple, Optional
from dataclasses import dataclass
import matplotlib.pyplot as plt
import heapq


# Configure logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(threadName)s - %(message)s')
logger = logging.getLogger()

# Global mutex for thread synchronization
global_mutex = threading.Lock()


@dataclass
class Edge:
    src: int
    dest: int
    weight: int = 1


class Graph:
    def __init__(self):
        self.num_nodes: int = 0
        self.edges: List[Edge] = []
        self.adj: Dict[int, List[Tuple[int, int]]] = {}
        self.node_to_partition: List[int] = []

    def compute_adjacency_list(self):
        self.adj.clear()
        for e in self.edges:
            if e.src not in self.adj:
                self.adj[e.src] = []
            self.adj[e.src].append((e.dest, e.weight))

    def identify_boundary_nodes(self, my_partition: int) -> Set[int]:
        boundary = set()
        for node, neighbors in self.adj.items():
            if self.node_to_partition[node] != my_partition:
                continue
            for neighbor, _ in neighbors:
                if neighbor < len(self.node_to_partition) and self.node_to_partition[neighbor] != my_partition:
                    boundary.add(node)
                    break
        return boundary

    def load_from_file(self, filename: str):
        with open(filename, 'r') as f:
            for line in f:
                if line.startswith('#'):
                    continue
                parts = line.strip().split()
                if len(parts) >= 2:
                    src = int(parts[0])
                    dest = int(parts[1])
                    self.edges.append(Edge(src, dest))

        all_nodes = set(e.src for e in self.edges).union(
            e.dest for e in self.edges)
        self.num_nodes = max(all_nodes) + 1 if all_nodes else 0
        self.node_to_partition = [i % 4 for i in range(self.num_nodes)]
        self.compute_adjacency_list()


class UpdateType(enum.Enum):
    INSERT = 1
    DELETE = 2


@dataclass
class EdgeUpdate:
    type: UpdateType
    src: int
    dest: int
    weight: int = 1


def process_updates_thread(graph: Graph, updates: List[EdgeUpdate], start_idx: int, end_idx: int, adjacency_changed: List[bool]):
    local_changed = False
    for i in range(start_idx, end_idx):
        upd = updates[i]
        if upd.src >= graph.num_nodes or upd.dest >= graph.num_nodes:
            with global_mutex:
                logger.warning(
                    f"Ignoring invalid update {upd.src}->{upd.dest}")
            continue

        with global_mutex:
            if upd.type == UpdateType.INSERT:
                exists = any(neigh == upd.dest for neigh,
                             _ in graph.adj.get(upd.src, []))
                if not exists:
                    graph.edges.append(Edge(upd.src, upd.dest, upd.weight))
                    local_changed = True
            elif upd.type == UpdateType.DELETE:
                original_len = len(graph.adj.get(upd.src, []))
                graph.edges = [e for e in graph.edges if not (
                    e.src == upd.src and e.dest == upd.dest)]
                if upd.src in graph.adj:
                    graph.adj[upd.src] = [
                        (d, w) for d, w in graph.adj[upd.src] if d != upd.dest]
                    if len(graph.adj[upd.src]) < original_len:
                        local_changed = True

    if local_changed:
        with global_mutex:
            adjacency_changed[0] = True


def apply_updates(graph: Graph, updates: List[EdgeUpdate]):
    adjacency_changed = [False]
    num_threads = max(1, min(4, len(updates))) if len(updates) > 100 else 1
    chunk_size = (len(updates) + num_threads - 1) // num_threads
    threads = []

    for i in range(num_threads):
        start_idx = i * chunk_size
        end_idx = min(len(updates), (i + 1) * chunk_size)
        if start_idx >= len(updates):
            break
        t = threading.Thread(target=process_updates_thread, args=(
            graph, updates, start_idx, end_idx, adjacency_changed))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    if adjacency_changed[0]:
        graph.compute_adjacency_list()


def compute_optimal_delta(graph: Graph) -> int:
    if not graph.edges:
        return 1
    weights = [e.weight for e in graph.edges]
    avg_weight = sum(weights) / len(weights)
    return max(1, min(int(avg_weight), 100))


def identify_affected_nodes(graph: Graph, updates: List[EdgeUpdate], current_dist: List[float]) -> Set[int]:
    affected = set()
    INF = float('inf')
    for upd in updates:
        affected.update([upd.src, upd.dest])
        if upd.type == UpdateType.INSERT:
            if current_dist[upd.src] != INF and current_dist[upd.src] + upd.weight < current_dist[upd.dest]:
                affected.add(upd.dest)
        elif upd.type == UpdateType.DELETE:
            affected.update(n for n, _ in graph.adj.get(upd.src, []))
            affected.update(n for n, _ in graph.adj.get(upd.dest, []))
    return affected


def process_bucket_nodes(graph: Graph, dist: List[float], nodes: Set[int], delta: int, next_buckets: Dict[int, Set[int]], process_light: bool):
    INF = float('inf')
    local_relaxed = {}

    for u in nodes:
        for v, weight in graph.adj.get(u, []):
            if (process_light and weight <= delta) or (not process_light and weight > delta):
                new_dist = dist[u] + weight
                if new_dist < dist[v]:
                    local_relaxed[v] = min(local_relaxed.get(v, INF), new_dist)

    with global_mutex:
        for v, new_dist in local_relaxed.items():
            if new_dist < dist[v]:
                dist[v] = new_dist
                bucket = int(new_dist // delta)
                if bucket not in next_buckets:
                    next_buckets[bucket] = set()
                next_buckets[bucket].add(v)


def delta_stepping_sssp(graph: Graph, dist: List[float], source: int, delta: int, affected_nodes: Optional[Set[int]] = None):
    INF = float('inf')
    if affected_nodes is None or not affected_nodes:
        dist[:] = [INF] * len(dist)
        dist[source] = 0
        affected_nodes = {source}

    max_bucket = 1000000
    buckets = [set() for _ in range(max_bucket)]
    for node in affected_nodes:
        if node < len(dist) and dist[node] != INF:
            b_idx = int(dist[node] // delta)
            buckets[b_idx].add(node)

    num_threads = max(1, min(4, len(affected_nodes)))

    for i in range(max_bucket):
        while buckets[i]:
            current_nodes = buckets[i]
            buckets[i] = set()

            next_light, next_heavy = {}, {}

            node_chunks = [set() for _ in range(num_threads)]
            for idx, node in enumerate(current_nodes):
                node_chunks[idx % num_threads].add(node)

            # Light edges
            threads = []
            for chunk in node_chunks:
                if chunk:
                    t = threading.Thread(target=process_bucket_nodes, args=(
                        graph, dist, chunk, delta, next_light, True))
                    threads.append(t)
                    t.start()
            for t in threads:
                t.join()

            for b_idx, nodes in next_light.items():
                if b_idx < len(buckets):
                    buckets[b_idx].update(nodes)

            # Heavy edges
            threads = []
            for chunk in node_chunks:
                if chunk:
                    t = threading.Thread(target=process_bucket_nodes, args=(
                        graph, dist, chunk, delta, next_heavy, False))
                    threads.append(t)
                    t.start()
            for t in threads:
                t.join()

            for b_idx, nodes in next_heavy.items():
                if b_idx < len(buckets):
                    buckets[b_idx].update(nodes)


def dijkstra_sssp(graph: Graph, source: int) -> List[float]:
    INF = float('inf')
    dist = [INF] * graph.num_nodes
    dist[source] = 0
    heap = [(0, source)]
    visited = set()

    while heap:
        current_dist, u = heapq.heappop(heap)
        if u in visited:
            continue
        visited.add(u)
        for v, weight in graph.adj.get(u, []):
            if dist[v] > current_dist + weight:
                dist[v] = current_dist + weight
                heapq.heappush(heap, (dist[v], v))
    return dist


def benchmark_sssp(graph: Graph, source: int, delta: int):
    # Delta-Stepping
    dist_delta = [float('inf')] * graph.num_nodes
    start_time = time.time()
    delta_stepping_sssp(graph, dist_delta, source, delta)
    delta_time = time.time() - start_time

    # Dijkstra's
    start_time = time.time()
    dist_dijkstra = dijkstra_sssp(graph, source)
    dijkstra_time = time.time() - start_time

    # Validate correctness
    assert all(abs(a - b) < 1e-6 for a, b in zip(dist_delta,
               dist_dijkstra)), "Results mismatch!"

    return delta_time, dijkstra_time


def profile_memory(func, *args, **kwargs):
    """Dummy memory profiler when memory_profiler package is not available"""
    # Run the function once to get execution time
    start_time = time.time()
    result = func(*args, **kwargs)
    elapsed = time.time() - start_time

    # Return dummy memory value (0) and log timing info
    logger.info(
        f"Function {func.__name__} executed in {elapsed:.2f}s (memory profiling disabled)")
    return 0


def plot_results(thread_counts, times, title):
    plt.figure(figsize=(10, 5))
    plt.plot(thread_counts, times, marker='o', linestyle='-', color='b')
    plt.xlabel("Thread Count")
    plt.ylabel("Execution Time (s)")
    plt.title(title)
    plt.grid(True)
    plt.savefig(f"{title.replace(' ', '_')}.png")
    plt.show()


def main():
    graph = Graph()
    # Update path as needed
    dataset_file = "/content/Dataset.txt"

    logger.info(f"Loading graph from {dataset_file}...")
    start_time = time.time()
    graph.load_from_file(dataset_file)
    load_time = time.time() - start_time
    logger.info(
        f"Graph loaded in {load_time:.2f} seconds with {graph.num_nodes} nodes and {len(graph.edges)} edges.")

    source_node = 0
    delta = compute_optimal_delta(graph)
    logger.info(f"Optimal delta: {delta}")

    # Benchmark SSSP algorithms
    delta_time, dijkstra_time = benchmark_sssp(graph, source_node, delta)
    logger.info(
        f"Delta-Stepping Time: {delta_time:.2f}s | Dijkstra Time: {dijkstra_time:.2f}s | Speedup: {dijkstra_time / delta_time:.2f}x")

    # Memory profiling (disabled but kept for structure)
    mem_delta = profile_memory(delta_stepping_sssp, graph,
                               [float('inf')] * graph.num_nodes, source_node, delta)
    mem_dijkstra = profile_memory(dijkstra_sssp, graph, source_node)
    logger.info(
        f"Memory profiling disabled - Peak values not available")

    # Thread scaling analysis
    thread_counts = [1, 2, 4, 8]
    delta_times = []
    for threads in thread_counts:
        logger.info(f"Running Delta-Stepping with {threads} threads...")
        start_time = time.time()
        delta_stepping_sssp(graph, [float('inf')]
                            * graph.num_nodes, source_node, delta)
        delta_times.append(time.time() - start_time)
    plot_results(thread_counts, delta_times, "Delta-Stepping Thread Scaling")

    # Update impact analysis
    num_updates_list = [10, 100, 1000]
    incremental_times = []
    for num_updates in num_updates_list:
        updates = [EdgeUpdate(
            UpdateType.INSERT if random.random() < 0.5 else UpdateType.DELETE,
            random.randint(0, graph.num_nodes - 1),
            random.randint(0, graph.num_nodes - 1)
        ) for _ in range(num_updates)]

        dist = [float('inf')] * graph.num_nodes
        delta_stepping_sssp(graph, dist, source_node, delta)

        start_time = time.time()
        apply_updates(graph, updates)
        affected_nodes = identify_affected_nodes(graph, updates, dist)
        delta_stepping_sssp(graph, dist, source_node, delta, affected_nodes)
        incremental_times.append(time.time() - start_time)

    plt.figure(figsize=(10, 5))
    plt.plot(num_updates_list, incremental_times,
             marker='o', linestyle='-', color='r')
    plt.xlabel("Number of Updates")
    plt.ylabel("Incremental SSSP Time (s)")
    plt.title("Impact of Updates on Performance")
    plt.grid(True)
    plt.savefig("Update_Impact.png")
    plt.show()


if __name__ == "__main__":
    main()
