In [49]:
class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def is_close(self, other, tolerance):
        return abs(self.x - other.x) < tolerance and abs(self.y - other.y) < tolerance

In [50]:
class Edge:
    def __init__(self, p1, p2, x_speed=0, y_speed=0):
        self.p1 = p1
        self.p2 = p2
        self.x_speed = x_speed
        self.y_speed = y_speed

    def is_point_on_edge(self, point):
        return point.x >= min(self.p1.x, self.p2.x) and point.x <= max(self.p1.x, self.p2.x) and point.y >= min(self.p1.y, self.p2.y) and point.y <= max(self.p1.y, self.p2.y)

In [51]:
class Car:
    def __init__(self, name, position, destination, speed = 1):
        self.name = name
        self.position = position
        self.visited_intersection = False
        self.destination = destination
        self.speed = speed

    def move(self, x, y):
        self.position.x += x
        self.position.y += y

    def is_at_intersection(self):
        if (self.position.x == 0 and self.position.y > -1 and self.position.y < 1) \
            or (self.position.y == 0 and self.position.x > -1 and self.position.x < 1):
            if not self.visited_intersection:
                self.visited_intersection = True
                if self.destination == 'E':
                    self.position.x = 1
                    self.position.y = 0
                elif self.destination == 'S':
                    self.position.x = 1
                    self.position.y = -1
            return True
        return False

    def distance_to_intersection(self):
        return abs(self.position.x) + abs(self.position.y)

In [52]:
import pandas as pd

class Controller:
    def __init__(self, cars, edges, speed_reduction_factor=0.5):
        self.cars = cars
        self.edges = edges
        self.time = 0
        self.intersection_block_times = {}
        self.speed_reduction_factor = speed_reduction_factor
        self.edges_with_reduced_speed = []
        self.log_content = []
        self.car_positions = {}
        # print("initialized with srf", speed_reduction_factor)

    def check_for_collision(self):
        for car1 in self.cars:
            for car2 in self.cars:
                if car1.name == car2.name:
                    continue
                if car1.position.is_close(car2.position, 0.5):
                    # print(f"Collision detected between {car1.name} and {car2.name}")
                    return True

    def predict_collision_at_intersection(self, car):
        car_edge = self.get_edge(car)
        # print(car_edge.x_speed, car_edge.y_speed)
        time_to_intersection = self.time + (abs(car.position.x) + abs(car.position.y) ) / max(abs(car_edge.x_speed), abs(car_edge.y_speed))
        # print(car.name, "tti", time_to_intersection, self.speed_reduction_factor)
        for car_name in self.intersection_block_times:
            if car_name == car.name:
                continue
            intersection_block_time = self.intersection_block_times[car_name]
            if intersection_block_time[0] <= time_to_intersection <= intersection_block_time[1]:
                # print(f"Car {car.name} travelling at speed {car_edge.x_speed + car_edge.y_speed} is blocked by car {car_name} from time {intersection_block_time[0]} to {intersection_block_time[1]}")
                return True
        if not self.intersection_block_times.get(car.name):
            self.intersection_block_times[car.name] = [time_to_intersection, time_to_intersection + 5]
            # print(f"intersection reserved for {car.name} from {time_to_intersection} to {time_to_intersection+2}")
        return False

    def move_car(self, car):
        car_edge = self.get_edge(car)
        car.move(car_edge.x_speed, car_edge.y_speed)
        if self.predict_collision_at_intersection(car):
            # print(f"Possible collision at intersection for car {car.name}")
            if car_edge in self.edges_with_reduced_speed:
              return self.check_for_collision()
            self.edges_with_reduced_speed.append(car_edge)
            car_edge.x_speed = car_edge.x_speed - self.speed_reduction_factor if car_edge.x_speed > 0 else car_edge.x_speed
            car_edge.y_speed = car_edge.y_speed - self.speed_reduction_factor if car_edge.y_speed > 0 else car_edge.y_speed
            # print("new speeds for", car.name, car_edge.x_speed, car_edge.y_speed, "after reducing by", self.speed_reduction_factor)
            if self.predict_collision_at_intersection(car):
              return True
        return self.check_for_collision()


    def get_edge(self, car):
        for edge in self.edges:
            if edge.is_point_on_edge(car.position):
                return edge
        raise Exception(f"Car {car.name} is not on any edge while at ({car.position.x}, {car.position.y})")

    def print_status(self, car):
      car_edge = self.get_edge(car)
      self.log_content.append(f"car {car.name} is at {car.position.x}, {car.position.y} with speed {car_edge.x_speed}, {car_edge.y_speed}")
      if not self.car_positions.get(car.name):
          self.car_positions[car.name] = []
      self.car_positions[car.name].append(f'({car.position.x}, {car.position.y})')

    def pad_position_arrays(self):
        if len(self.car_positions['A']) != len(self.car_positions['B']):
            if len(self.car_positions['A']) > len(self.car_positions['B']):
                self.car_positions['B'].extend([self.car_positions['B'][-1]] * (len(self.car_positions['A']) - len(self.car_positions['B'])))
            else:
                self.car_positions['A'].extend([self.car_positions['A'][-1]] * (len(self.car_positions['B']) - len(self.car_positions['A'])))
                
    def write_log(self, filename):
        with open(filename, "w") as f:
            for line in self.log_content:
                f.write(line + "\n")
        print("Log written to", filename)
        self.pad_position_arrays()
        df = pd.DataFrame(self.car_positions)
        df.columns = ['SE_CAR_A', 'WS_CAR_B']
        df.to_excel("simulation/data/car_positions.xlsx", index=False)

    def simulate(self):
        while True:
            if len(self.cars) == 0:
                # print("All cars have left the intersection")
                break
            for car in self.cars:
                self.print_status(car)
                if self.move_car(car):
                    return -1
                car.is_at_intersection()
                if car.visited_intersection and car.distance_to_intersection() > 10:
                    self.cars.remove(car)
                    # print(f"{car.name} has left the intersection")
                    if len(self.cars) == 0:
                        # print("All cars have left the intersection")
                        return self.time
            self.time += 0.1
            # print("Time: ", self.time)

In [53]:
# edges = [
#     # south to north road with two edges
#     Edge(Point(0, -50), Point(0, -40), 0, 1),
#     Edge(Point(0, -40), Point(0, 0), 0, 1),
#     Edge(Point(0, 0), Point(0, 40), 0, 1),

#     # west to east road with two edges
#     Edge(Point(-50, 0), Point(-40, 0), 1, 0),
#     Edge(Point(-40, 0), Point(0, 0), 1, 0),
#     Edge(Point(0, 0), Point(40, 0), 1, 0),
# ]

# edges = [
#     # south to north road with two edges
#     Edge(Point(0, -50), Point(0, 40), 0, 1),

#     # west to east road with two edges
#     Edge(Point(-50, 0), Point(40, 0), 1, 0),
# ]

# cars = [
#     Car("A", Point(0, -50)),
#     Car("B", Point(-50, 0)),
# ]

In [54]:
import numpy as np

class QLearningController(Controller):
    def __init__(self, cars, edges, alpha=0.1, gamma=0.6, epsilon=0.1):
        super().__init__(cars, edges)
        self.q_table = np.zeros((101, 101))  # Q-table for speed_reduction_factor values from 0 to 1
        self.alpha = alpha  # Learning rate
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Exploration rate

    def simulate(self):
      self.update_q_table()

    def update_q_table(self):
      for i in range(101):
          reward = self.simulate_with_speed_reduction(i/100)
          for j in range(101):
              # print("reward", reward, "for", i/100)
              next_max = np.max(self.q_table[i])
              self.q_table[i, j] = (1 - self.alpha) * self.q_table[i, j] + self.alpha * (reward + self.gamma * next_max)


    def simulate_with_speed_reduction(self, speed_reduction_factor):
      if speed_reduction_factor == 1:
        return -1000
      controller_copy = self.copy_controller(speed_reduction_factor)
      controller_copy.speed_reduction_factor = speed_reduction_factor
      total_time_taken = controller_copy.simulate()
      if total_time_taken == -1:
          return -1000  # Penalize collision heavily
      return -total_time_taken  # Negative total time taken to minimize delay

    def copy_controller(self, speed_reduction_factor):
        cars_copy = [Car(car.name, Point(car.position.x, car.position.y), car.destination) for car in self.cars]
        edges_copy = [Edge(edge.p1, edge.p2, edge.x_speed, edge.y_speed) for edge in self.edges]
        return Controller(cars_copy, edges_copy, speed_reduction_factor)

# Example usage
edges = [
    Edge(Point(0, -50), Point(0, 40), 0, 1),  # South to north road
    Edge(Point(-50, 0), Point(40, 0), 1, 0),  # West to east road
    Edge(Point(1, -50), Point(1, 40), 0, -1),  # North to south road
    Edge(Point(-50, 1), Point(40, 1), 1, 0),  # East to west road
]
cars = [
    Car("A", Point(0, -50), 'E'),
    Car("B", Point(-50, 0), 'S'),
]
controller = QLearningController(cars, edges)
total_time_taken = controller.simulate()
q_table = controller.q_table

In [55]:
max_q_values = []
for i in range(101):
  # print(i/100, ",", max(q_table[i]))
  max_q_values.append(max(q_table[i]))
most_optimal_srf = np.argmax(max_q_values)/100
print("most optimal edge weight", 1-most_optimal_srf)

most optimal edge weight 0.9


In [56]:
optimal_controller = Controller(cars, edges, most_optimal_srf)
delay = optimal_controller.simulate()
optimal_controller.write_log("logs/positions.log")
delay

Log written to logs/positions.log


6.399999999999993