<a href="https://colab.research.google.com/github/PacomeKFP/transportation-time-estimation-operational-research/blob/main/Estimation_du_temps_de_transport.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional, Set, Annotated, Literal, TypeVar, Callable
import numpy as np
from numpy.typing import NDArray
import numpy.typing as npt

DELTA_T = 0.1  # seconds
MAX_NUMBER = 999
STATE_DIMENSION = 6
DType = TypeVar("DType", bound=np.generic)
Array6 = Annotated[npt.NDArray[DType], Literal[STATE_DIMENSION, 1]]
Matrix6 = Annotated[npt.NDArray[DType],
                    Literal[STATE_DIMENSION, STATE_DIMENSION]]
flt = np.float32


class Weather(Enum):
    warm = 0
    dry = 1


class CirculationState(Enum):
    fluid = 0


class ChunkState(Enum):
    ONE = 1
    TWO = 2
    THREE = 3
    FOUR = 4
    FIVE = 5
    SIX = 6


    @staticmethod
    def nominal_velocities_vector() -> Array6[np.float32]:
        return np.array([10.0, 10.0, 10.0, 10.0, 10.0, 10.0], dtype=np.float32)


@dataclass
class Chunk:
    length: np.float32
    


@dataclass
class Intersection:
    name: str
    label: Optional[str] = None


@dataclass
class Edge:
    origin: Intersection
    extremity: Intersection
    chunks: List[Chunk]


@dataclass
class Path:
    edges: List[Edge]
    total_cost: np.float32


@dataclass
class Graph:
    intersections: Set[Intersection]
    edges: Set[Edge]


@dataclass
class Provider:
    name: str
    location: Intersection
    available_quantity: int
    departure_instant: np.float32 = 0
    best_path: Optional[Path] = None
    transportation_time: Optional[np.float32] = None

    def __str__(self) -> str:
        time_str = f"{self.transportation_time} seconds" if self.transportation_time is not None else "Not calculated"
        return f"{self.name} - {self.location.name} - {self.available_quantity} units available - Estimated time: {time_str}"


@dataclass
class Solver:
    providers: List[Provider]
    destination: Intersection
    map: Optional[Graph] = None
    requested_quantity: int = 0
    time_differential: int = DELTA_T
    preselected_providers: List[Provider] = field(default_factory=list)
    sorted_providers: List[Provider] = field(default_factory=list)

    def execute(self) -> None:
        self._preselect_provider()
        self._estimate_transportation_time_for_each_provider()
        self._sort_providers()
        self._display_top_providers(5)

    def _preselect_provider(self) -> List[Provider]:
        self.preselected_providers = []
        for provider in self.providers:
            if provider.available_quantity >= self.requested_quantity:
                self.preselected_providers.append(provider)
        if not self.preselected_providers:
            raise ValueError(
                "No providers available with sufficient quantity.")
        return self.preselected_providers

    def _estimate_transportation_time_for_each_provider(self) -> None:
        for provider in self.preselected_providers:
            provider.transportation_time = self._estimate_transportation_time_for_a_provider(
                provider)

    def _estimate_transportation_time_for_a_provider(self, provider: Provider) -> np.float32:
        if provider.best_path is not None:
            return provider.best_path.total_cost
        return MAX_NUMBER

    def _sort_providers(self) -> List[Provider]:
        self.sorted_providers = sorted(
            self.preselected_providers,
            key=lambda p: p.best_path.total_cost if p.best_path and p.best_path.total_cost is not None else MAX_NUMBER
        )
        return self.sorted_providers

    def _display_top_providers(self, number: int = 5) -> List[Provider]:
        print("The best providers are:")
        for i in range(min(number, len(self.sorted_providers))):
            provider = self.sorted_providers[i]
            print(f"{i+1}. {provider}")

        return self.sorted_providers[:number]


@dataclass
class ChunckTransportationTimeEstimator:
    chunk: Chunk
    time_origin: np.float32
    states_distribution: Array6[np.float32]
    P: Matrix6[Callable[[np.float32], np.float32]]
    time_differential: np.float32 = DELTA_T


    def estimate(self) -> np.float32:
        traveled_distance = 0.0
        time_estimation = 0.0
        while traveled_distance < self.chunk.length:
            velocity = self._compute_distributed_velocity(self.states_distribution, ChunkState.nominal_velocities_vector())
            temporal_distance = velocity * self.time_differential
            if temporal_distance + traveled_distance >= self.chunk.length:
                temporal_distance = self.chunk.length - traveled_distance
            traveling_time = temporal_distance / velocity
            time_estimation += traveling_time
            traveled_distance += temporal_distance

            self.states_distribution = self.states_distribution @ self.P(traveling_time)

        return time_estimation

    def _compute_distributed_velocity(self, distribution: Array6[np.float32], nominal_velocities: Array6[np.float32]) -> np.float32:
        return np.sum(distribution * nominal_velocities)


@dataclass
class EdgeTransportationTimeEstimator:
    edge: Edge
    time_origin: np.float32
    states_distribution: Array6[np.float32]
    P: Matrix6[Callable[[np.float32], np.float32]]
    time_differential: np.float32 = DELTA_T



    def estimate(self) -> np.float32:
        total_time_estimation = 0.0
        for chunk in self.edge.chunks:
            estimator = ChunckTransportationTimeEstimator(
                chunk=chunk,
                time_origin=self.time_origin,
                states_distribution=self.states_distribution,
                time_differential=self.time_differential,
                P=self.P
            )
            total_time_estimation += estimator.estimate()
        return total_time_estimation