In [1]:
import numpy as np
from typing import List, Tuple
from road_network.utilities import MyList

In [2]:
speeds = [np.array([0, 0]), np.array([-1, 0]), np.array([0, -1]), np.array([1, 0]), np.array([0, 1])]


class CarManager:
    all_cars: List = [None]
    
    @staticmethod
    def get_car(car_id: int):
        assert 0 < car_id < len(CarManager.all_cars)
        return CarManager.all_cars[car_id]

class RoadManager:
    roads: List = []


In [3]:
class Car:
    route = []
    coords = np.array([0, 0])
    speed = speeds[0]
    to_output = 0

    def __init__(self, route: List[int] = None, destination_id: int = -1):
        """

        :param route: list with no. of an output on every crossroad
        """
        self.id = len(CarManager.all_cars)
        CarManager.all_cars.append(self)

        self.route = route if route is not None else []
        self.destination = destination_id

    def position(self):
        """
        :return: coordinates of a car
        """
        return self.coords

    def set_position(self, coords: np.ndarray):
        self.coords = coords

    def set_speed(self, new_speed):
        """
        Change speed of a car
        :param new_speed: new speed
        """
        if type(new_speed) == int:
            new_speed = speeds[new_speed]
        elif type(new_speed) != np.ndarray:
            new_speed = np.array(new_speed)
        self.speed = new_speed

    def get_next_destination(self):
        if len(self.route) > 0:
            return self.route[0]
        else:
            return -1

    def next_point_on_route(self):
        if len(self.route) > 0:
            return self.route.pop(0)

        return -1

    def move(self, road):
        """
        Move a car on a trafic greed if possible
        :param road: road class on which moving
        """
        try:
            if road.is_empty(self.coords + self.speed):
                road.set_state(self.coords, 0)
                road.set_state(self.coords + self.speed, self.id)

                self.coords += self.speed
        except IndexError:
            pass


class BaseRoad:

    def __init__(self, grid_size, shuffle: bool = False, name = None):
        self._road = np.zeros(grid_size, dtype="int32")
        self._next_state = self._road.copy()

        self._history: List[np.ndarray] = []
        self._inputs: List = []
        self._outputs: List = []
        self._cars = MyList()
        self._new_cars = MyList()
        self.shuffle = shuffle

        self._stats: List[Tuple] = []  # (speed, n_cells, n_cars, n_moved)

        self._name = name if name is not None else "road"
        RoadManager.roads.append(self)

    def __str__(self):
        return self._name

    def __repr__(self):
        return self._name

    def add_input(self, input_road, direction=0, index=0):
        if len(self._inputs) - 1 < direction:
            self._inputs.extend([None]*(direction - len(self._inputs) + 1))

        self._inputs[direction] = (input_road, index)

    def add_output(self, output_road, direction: int = 0, index: int = 0):
        # print(self, output_road, direction, index)
        if len(self._outputs) - 1 < direction:
            self._outputs.extend([None]*(direction - len(self._outputs) + 1))

        output_road.add_input(self, index, direction)
        self._outputs[direction] = (output_road, index)

    def add_car(self, car_id: int, direction: int = 0) -> int:
        """
        Add car on a road if input empty

        :param car: car
        :param direction: index of input
        :return:
            0 if input is not empty and car cannot be processed
            1 otherwise
        """
        self._new_cars.add(car_id)
        return 1

    def _remove_car(self, car_id: int):
        self._cars.remove(car_id)
        
    def process_output(self, direction: int = 0):
        pass

    def process_outputs(self):
        return self.process_output()

    def render(self):
        return self._road.copy()

    def get_history(self, depth):
        if depth == 0:
            return [self.render()]

        return self._history[-depth:] + [self.render()]

    def is_empty(self, coords: np.ndarray):
        if coords.shape != (2,):
            raise ValueError("coords should be an array with 2 coordinates")
        return (self._road[coords[0], coords[1]] == 0) and (self._next_state[coords[0], coords[1]] == 0)

    def _get_state(self, coords: np.ndarray):
        if coords.shape != (2,):
            raise ValueError("coords should be an array with 2 coordinates")
        return self._next_state[coords[0], coords[1]]

    def set_state(self, coords: np.ndarray, state: int):
        if coords.shape != (2,):
            raise ValueError("coords should be an array with 2 coordinates")
        self._next_state[coords[0], coords[1]] = state

    def move_cars(self, frame=0):
        pass

    def step(self, time_step=0) -> Tuple:
        """
        Complete evaluation and calculate speed
        :param time_step:
        :return:
            tuple with n_cells, n_cars, n_moved_cars
        """
#         if self.shuffle:
#             np.random.shuffle(self._cars)

        self._history.append(self.render())
        self._road = self._next_state
        self._next_state = self._road.copy()

        self._cars.extend(self._new_cars)
        self._new_cars.clear()

        n_cars = self._cars.length()
        moved = ((self._history[-1] - self.render()) != 0).sum() / 2
#         n_cars = n_cars if n_cars > moved else moved
        self._stats.append((self._road.shape[0]*self._road.shape[1], n_cars, moved))
        # print(2, type(self))
        return self._stats[-1]

    def get_stats(self):
        return self._stats


class VoidGenerator(BaseRoad):

    def __init__(self, p_new: int = 1, random_walk: bool = True, p_rot: List = None, first_n: int = 0, **kwargs):
        super(VoidGenerator, self).__init__((1, 1), name=kwargs.get("name", "void"))
        self.p_new = p_new
        self.random_walk = random_walk
        self.p_rot = p_rot if p_rot is not None else [.25, .25, .25, .25]
        self.first_n = first_n

    def add_output(self, output_road, direction: int = 0, index: int = 0):
        direction = len(self._outputs)
        output_road.add_input(self, index, direction)
        self._outputs.append((output_road, index))

    def process_output(self, direction: int = 0):

        if np.random.choice((0, 1), p=(1-self.p_new, self.p_new)) == 0:
            return 1

        if self._cars.length() == 0:
            self._cars.add(Car().id)

        car_id = self._cars.get_first()
        if self.random_walk:
            car = CarManager.get_car(car_id)
            car.route = np.random.choice((0, 1, 2, 3), self.first_n, p=self.p_rot).tolist()
            car.route.extend(np.random.choice((0, 1, 2, 3), 20).tolist())

        if self._outputs[direction][0].add_car(car_id, self._outputs[0][1]):
            self._cars.remove(car_id)
            # print("Crec")
            return 1
        return 0

    def process_outputs(self):
        for i in range(len(self._outputs)):
            if self._outputs[i] is None:
                continue
            self.process_output(i)

    def step(self, time_step=0):
        self._stats.append((0, 0, 0))
        return 0, 0, 0


class Line(BaseRoad):
    
    def __init__(self, length, **kwargs):
        super(Line, self).__init__((1, length), name=kwargs.get("name", "Line"))
        self.length = length
        self.beginning = np.array((0, 0))
        self.end = np.array((0, length-1))

    def add_car(self, car_id: int, direction: int = 0) -> int:
        """
        Add car on a road if input empty

        :param car: car
        :param direction: index of input
        :return:
            0 if input is not empty
            1 otherwise
        """
        if self._get_state(self.beginning) != 0:
            return 0

        self._next_state[0, 0] = car_id
        car = CarManager.get_car(car_id)
        car.set_position(self.beginning.copy())
        car.set_speed(4)
        return super(Line, self).add_car(car_id, direction)

    def render(self):
        return self._road

    def process_output(self, direction: int = 0) -> int:
        """

        :param direction:
        :return:
            1 if everything is OK
            0 otherwise
        """
        car_id = self._get_state(self.end)
        if car_id == 0:
            return 1

#         if not (car_id in self._cars):
#             self.set_state(self.end, 0)
#             return 0

        # this = car.next_point_on_route()
        if self._outputs[0][0].add_car(car_id, self._outputs[0][1]):
            self.set_state(self.end, 0)
            self._remove_car(car_id)
            return 1

        # car.route.insert(0, this)
        return 0
    
    def move_cars(self, time_step=0):
        """
        :param time_step:
        :return:
        """
        if self.shuffle:
            for car_id in self._cars.shuffled():
                car = CarManager.get_car(car_id)
                if (car.position() == self.end).all():
                    continue
                car.move(self)
        else:
            for car_id in self._cars.values():
                car = CarManager.get_car(car_id)
                if (car.position() == self.end).all():
                    continue
                car.move(self)

In [4]:
line = Line(5)
void = VoidGenerator(.6)

In [5]:
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML

%matplotlib inline

In [6]:
void.add_output(line)
line.add_output(void)

In [7]:
for frame in range(100):
    line.move_cars(frame)
    void.move_cars(frame)
    line.step()
    void.step()
    line.process_output()
    void.process_output()
    if line._cars.length() + line._new_cars.length() != np.sum(line._next_state != 0):
        print(line._new_cars.length(), line._cars.length(), np.sum(line._road != 0))
        break

In [8]:
line._cars.length()

2

In [9]:
for i in line._cars.values():
    print(i)

36
35


In [11]:
line._stats

[(5, 0, 0.0),
 (5, 1, 0.5),
 (5, 1, 1.0),
 (5, 2, 1.5),
 (5, 2, 2.0),
 (5, 3, 2.5),
 (5, 2, 2.5),
 (5, 3, 2.5),
 (5, 2, 2.5),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 3, 2.5),
 (5, 2, 2.5),
 (5, 3, 2.5),
 (5, 2, 2.5),
 (5, 3, 2.5),
 (5, 2, 2.5),
 (5, 3, 2.5),
 (5, 2, 2.5),
 (5, 3, 2.5),
 (5, 2, 2.5),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 1, 1.5),
 (5, 2, 1.5),
 (5, 2, 2.0),
 (5, 1, 1.5),
 (5, 1, 1.0),
 (5, 2, 1.5),
 (5, 1, 1.5),
 (5, 1, 1.0),
 (5, 1, 1.0),
 (5, 2, 1.5),
 (5, 1, 1.5),
 (5, 2, 1.5),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 1, 1.5),
 (5, 1, 1.0),
 (5, 1, 1.0),
 (5, 1, 1.0),
 (5, 2, 1.5),
 (5, 2, 2.0),
 (5, 3, 2.5),
 (5, 2, 2.5),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 1, 1.5),
 (5, 2, 1.5),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 1, 1.5),
 (5, 2, 1.5),
 (5, 1, 1.5),
 (5, 1, 1.0),
 (5, 1, 1.0),
 (5, 2, 1.5),
 (5, 1, 1.5),
 (5, 2, 1.5),
 (5, 2, 2.0),
 (5, 2, 2.0),
 (5, 1, 1.5),
 (5, 1, 1.0),
 (5, 0, 0.5),
 (5, 1

In [55]:
line._cars

[12, 13]