In [1]:
# imported libraries
import networkx as nx
import json
import heapq # for dijkstra
import copy
import random


In [2]:
# function to load graph

def load_graph(filename="graph_with_metadata.json"):
    with open(filename, "r") as f:
        data = json.load(f)
    # Convert adjacency dict keys to int and neighbor node IDs to int
    adjacency_raw = data["adjacency"]
    adjacency = {int(k): [(int(n), w) for n, w in v] for k, v in adjacency_raw.items()}
    positions = {int(k): tuple(v) for k, v in data["positions"].items()}
    metadata = data.get("metadata", {})
    end_nodes_loaded = metadata.get("exit_nodes", [])
    exit_nodes = end_nodes_loaded[1:]

    # Reconstruct directed graph
    G_loaded = nx.DiGraph()
    for node, neighbors in adjacency.items():
        for nbr, weight in neighbors:
            G_loaded.add_edge(node, nbr, weight=weight)
    print(f"Graph successfully reloaded from '{filename}'")
    print(f"Metadata: {metadata}")

    return G_loaded, adjacency, positions, metadata, exit_nodes

In [3]:
G_loaded, adjacency, positions, metadata, exit_nodes=load_graph("./graph_with_metadata.json")

Graph successfully reloaded from './graph_with_metadata.json'
Metadata: {'n_nodes': 50, 'p': 0.2, 'traffic_min': 0.8, 'traffic_max': 1.2, 'directed_ratio': 0.2, 'description': 'Synthetic city map with directed and undirected roads', 'exit_nodes': [0, 48]}


In [4]:
#function to implement Dijkstra's algorithm

def dijkstra(adjacency_dict,start_node,target_node):
  priority_queue=[]
  visited_costs={}
  heapq.heappush(priority_queue,(0,start_node,[start_node]))
  while priority_queue:
    cost,node,path=heapq.heappop(priority_queue)
    if node in visited_costs and visited_costs[node]<cost:
      continue
    visited_costs[node]=cost
    if node in target_node:
      return cost,path
    if node in adjacency_dict:
      for neighbor,weight in adjacency_dict[node]:
        new_cost = cost+weight
        new_path = path+[neighbor]
        heapq.heappush(priority_queue,(new_cost,neighbor,new_path))
  return float('inf'),[]


In [5]:
adjacency_dynamic=copy.deepcopy(adjacency) # keeping the original adjacency list intact

In [6]:
# params
CAR_A_START_NODE=0
CAR_B_START_NODE=49
ESCAPE_POINTS=exit_nodes
SPEED_A=1
SPEED_B=1.5
DELAY_B=3
P=0.3
CATCHING_DISTANCE=0.5
MAX_STEPS=50

In [7]:
print("Car A(thief) starts at Node",CAR_A_START_NODE)
print("\n")
print("Car B(police) starts at Node",CAR_B_START_NODE)
print("\n")
print("Car A's possible exit nodes are:")
for i in ESCAPE_POINTS:
  print(i)

Car A(thief) starts at Node 0


Car B(police) starts at Node 49


Car A's possible exit nodes are:
48


In [8]:
# class to create a car

class Car:
  def __init__(self,start_node,speed,is_carB=False):
    self.position=start_node
    self.speed=speed
    self.path=[]
    self.onedge=False
    self.progress=0
    self.curr_edgewt=0
    self.edge_start=None
    self.edge_end=None
    if is_carB:
      self.delay=DELAY_B
    else:
      self.delay=0
    self.replan=False # no path replanning if the car is moving on an edge

  def path_set(self,path_new):  # action to set new path for the car
    self.path=path_new
    self.onedge=False
    self.progress=0
    self.curr_edgewt=0
    self.edge_start=None
    self.edge_end=None

  def move_car(self,curr_adjacency):
    if self.delay: # for car B
      self.delay-=1
      return

    if not self.onedge and len(self.path)>1: # if car is not on an edge, i.e. on a node
      self.edge_start=self.path[0]
      self.edge_end=self.path[1]
      found_edge=False
      if self.edge_start in curr_adjacency:
        for n,wt in curr_adjacency[self.edge_start]:
          if n==self.edge_end:
            self.curr_edgewt=wt
            found_edge=True
            break
      if not found_edge:
        self.replan=True
        self.path=[]
        return
      self.onedge=True
      self.progress=0

    if self.onedge:
      self.progress+=self.speed
      if self.progress>=self.curr_edgewt:
        self.position=self.edge_end
        self.onedge=False
        self.progress=0
        self.path.pop(0)

  def state(self): # for simulation.json
    return {
        "pos":self.position,
        "edge_from":self.edge_start if self.onedge else None,
        "edge_to":self.edge_end if self.onedge else None,
        "progress":self.progress,
        "Dijkstra_path":list(self.path)
      }



In [9]:
# random events during the chase

class EventManager:
  def __init__(self,og_adj,curr_adj):
    self.og_adjacency=og_adj
    self.curr_adjacency=curr_adj
    self.active_events=[]

  def trigger_event(self):
    if random.random()>P:
      return False, []
    event_type=random.choice(["Traffic jam","Blockage","One-way"])
    node_start=random.choice(list(self.og_adjacency.keys()))
    if not self.og_adjacency[node_start]:
      return False, []
    node_end,og_wt=random.choice(self.og_adjacency[node_start])
    edge=(node_start,node_end)

    event_logs=[]

    if event_type=="Traffic jam":
      new_wt=og_wt*random.uniform(2,2.5)
      for i,(n,wt) in enumerate(self.curr_adjacency[node_start]):
        if n==node_end:
          self.curr_adjacency[node_start][i]=(n,new_wt)
          break
      event_data = {
                'type': 'Traffic Jam',
                'edge': edge,
                'duration': 3,
                'original_weight': og_wt
            }
      event_logs.append(f"Traffic jam on {node_start}-->{node_end}")
      self.active_events.append(event_data)

    elif event_type=="Blockage":
      removed_edge=None
      for n,wt in self.curr_adjacency[node_start]:
        if n==node_end:
          removed_edge=(n,wt)
          self.curr_adjacency[node_start].remove((n,wt))
          break
      if removed_edge:
        event_data={
            'type':'Blockage',
            'edge':edge,
            'duration':4,
            'removed_edge_data':removed_edge
        }
        event_logs.append(f"Blockage on {node_start}-->{node_end}")
        self.active_events.append(event_data)

    elif event_type=="One-way":
      reverse_edge=None
      for n,wt in self.curr_adjacency[node_end]:
        if n==node_start:
          reverse_edge=(n,wt)
          self.curr_adjacency[node_end].remove((n,wt))
          break
      if reverse_edge:
        event_data={
            'type':'One-way',
            'edge':(node_end,node_start),
            'duration':6,
            'removed_edge_data':reverse_edge
        }
        event_logs.append(f"One-way created. Removed edge {node_end}-->{node_start}")
        self.active_events.append(event_data)
    return True, event_logs



  def resolve_events(self):
    remaining_events=[]
    resolved_logs=[]
    for event in self.active_events:
      event['duration']-=1
      if event['duration']<=0:
        if event['type']=="Traffic jam":
          node_start,node_end=event['edge']
          for i,(n,wt) in enumerate(self.curr_adjacency[node_start]):
            if n==node_end:
              self.curr_adjacency[node_start][i]=(n,event['original_weight'])
              break
          resolved_logs.append(f"Traffic jam removed on {node_start}-->{node_end}")
        elif event['type']=="Blockage":
          node_start,node_end=event['edge']
          self.curr_adjacency[node_start].append(event['removed_edge_data'])
          resolved_logs.append(f"Blockage removed on {node_start}-->{node_end}")
        elif event['type']=="One-way":
          node_end,node_start=event['edge']
          self.curr_adjacency[node_end].append(event['removed_edge_data'])
          resolved_logs.append(f"One-way removed, restored {node_end}-->{node_start}")
      else:
        remaining_events.append(event)
    self.active_events=remaining_events
    return resolved_logs


In [10]:
carA=Car(CAR_A_START_NODE,SPEED_A)
carB=Car(CAR_B_START_NODE,SPEED_B,is_carB=True)

event_mgr=EventManager(adjacency,adjacency_dynamic)

history=[]
caught=False
reached=False

cost_a,path_a=dijkstra(adjacency_dynamic,carA.position,ESCAPE_POINTS)
carA.path_set(path_a)

print("The chase is on!")

for step in range(1,MAX_STEPS+1):
  resolved_logs=event_mgr.resolve_events()
  event_triggered, trigerred_logs=event_mgr.trigger_event()
  all_logs_this_step=resolved_logs+trigerred_logs

  if event_triggered:
    carA.replan=True
    carB.replan=True

  if carA.replan and not carA.onedge:
    cost_a,path_a=dijkstra(adjacency_dynamic,carA.position,ESCAPE_POINTS)
    carA.path_set(path_a)
    carA.replan=False

  carA.move_car(adjacency_dynamic)

  if carA.replan and not carA.onedge:
    cost_a,path_a=dijkstra(adjacency_dynamic,carA.position,ESCAPE_POINTS)
    carA.path_set(path_a)
    carA.replan=False

  if carB.delay==0:
    if carA.onedge:
      carB_target=carA.edge_end
    else:
      carB_target=carA.position

    if not carB.onedge:
      cost_b,path_b=dijkstra(adjacency_dynamic,carB.position,[carB_target])
      carB.path_set(path_b)
      carB.replan=False

  carB.move_car(adjacency_dynamic)

  if carA.position in ESCAPE_POINTS and not carA.onedge:
    reached=True
  if carB.delay==0:
    if carB.position==carA.position and not carA.onedge and not carB.onedge:
      caught=True
    if carA.onedge and carB.onedge:
      same=(carA.edge_start==carB.edge_start and carA.edge_end==carB.edge_end)
      reverse=(carA.edge_start==carB.edge_end and carA.edge_end==carB.edge_start)

      if same and abs(carA.progress-carB.progress)<=CATCHING_DISTANCE:
        caught=True
      if reverse and (carA.curr_edgewt-carA.progress-carB.progress)<=CATCHING_DISTANCE:
        caught=True

  log_entry={
      "step":step,
      "carA":carA.state(),
      "carB":carB.state(),
      "caught":caught,
      "reached":reached,
      "new_events":all_logs_this_step,
      "active_events":copy.deepcopy(event_mgr.active_events)
  }

  history.append(log_entry)

  if caught:
    print(f"In step {step}, car B caught car A.")
    break
  if reached:
    print(f"In step {step}, car A escaped.")
    break

if not caught and not reached:
  print(f"No result until {MAX_STEPS} steps.")

f=open("simulation.json","w")
json.dump(history,f,indent=4)
f.close()

The chase is on!
In step 13, car B caught car A.
