In [1]:
import os
import numpy as np
import gymnasium as gym
import networkx as nx
from enum import Enum
from typing import List, Tuple, TypedDict, Union
from gymnasium.wrappers.time_limit import TimeLimit as GameEnv
from networkx.classes.graph import Graph
# Gymnasium wrappers
os.chdir("..")
from lib.models.Action import Action
from lib.models.Destination import Destination
from lib.models.EnvironmentInfo import EnvironmentInfo
from lib.environment.environment import GameEnvironment

# Gymnasium

In [444]:
class SamplePolicy:
    """
    Sample randomly the next action to take.
    """
    def pick_action(env: GameEnv):
        """
        Provide the action for the 

        Parameters
        ----------
        env: GameEnv
            The game environment.

        Returns
        -------
        """
        return env.action_space.sample()

class MonteCarloPolicy:
    def pick_action():
        pass

Policies = Union[SamplePolicy, MonteCarloPolicy]

class Policy:
    def __init__(self, policy: Policies):
        self.policy = policy
    
    def pick_action(env: GameEnv):
        return self.policy.pick_action()

# Underlying functions

In [462]:
def expected_value(n: int, p: float) -> float:
    """
    Compute the expected value.

    Parameters
    ----------
    n: int
        Number of outcomes.
    p: float
        Probability of each outcome.

    Returns
    -------
    float
        The expected value.
    """
    e = [i * p for i in range(n + 1)]
    e = np.array(e)
    return e.sum()

In [463]:
def cumulative_reward(
    rs: float, 
    r: float, 
    gamma: float | bool=False
):
    """
    Compute the cumulative rewards.

    Parmaters
    ---------
    rs: float
        The current accumulated rewards.
    r: float
        The next reward value.
    gamma: float | bool, default=False
        The discount factor.

    Returns
    -------
    float
        The accumulated rewards, taking into account the next reward value.
    """
    if gamma:
        return rs + (gamma * r)
    return rs + r

In [464]:
def q_function(rs: float, s, a):
    """
    Compute the expected return after taking an action (a), starting
    from a given state (s).

    Parameters
    ----------
    rs: float
        The cumulated rewards.
    s: float
        The next state.
    a: float
        The next action.

    Returns
    -------
    float
        The expected value of the cumulated rewards.
    """
    return expected_value(cumulative_reward(rs))

In [465]:
def bellman(gamma: float, r: float, s: float, a: float) -> float:
    """
    Compute the expected value of the reward for the next step
    additionned to the cumulated rewards.

    Parameters
    ----------
    gamma: float
        The discount factor.
    r: float
        The reward for the next step taken.
    s: float
        The reward for being in the current step.
    a: float
        The reward.

    Returns
    -------
    float
        The reward propagated through the previous iterations.
    """
    return expected_value(r + gamma * q_function(s, a))

In [466]:
def init_q_table(observation_space: int, actions: int):
    """
    Create the Q-table.

    Parameters
    ----------
    observation_space: int
        The cardinal of the set of all possible states.
    actions: int
        The cardinal of the set of all possible actions.
    
    Returns
    -------
    np.ndarray
        The Q-table.
    """
    return np.zeros((observation_space, actions))

In [467]:
q_table = init_q_table(500, 6)
q_table

array([[0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.],
       ...,
       [0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0., 0.]])

# 1. Q-learning

In [468]:
env = gym.make('Taxi-v3', render_mode="ansi")
env = GameEnvironment(env=env, reward=0, policy=SamplePolicy)

In [469]:
# Hyperparams
GAMMA = 0.9

In [470]:
rewards = env.reward
rewards

0

In [471]:
env.do_step(0, render=True)
rewards += env.reward
rewards

{'prob': 1.0, 'action_mask': array([1, 1, 1, 1, 0, 0], dtype=int8)}
<class 'dict'>
+---------+
|[34;1mR[0m: | : :G|
| : | : : |
| :[43m [0m: : : |
| | : | : |
|[35mY[0m| : |B: |
+---------+
  (Pickup)



  logger.warn(


-10

# 2. Non-markovian policy

Take into account where we come from :
- We don't want to go back from the place where we picked up the customer

# 2. Monte Carlo

In [8]:
graph = nx.Graph()

In [27]:
class MonteCarlo(GameEnvironment):
    def __init__(
        self,
        env: GameEnv,
        reward: int, 
        graph: Graph,
        seed: int = None
    ):
        super().__init__(
            env=env,
            reward=reward,
            seed=seed
        )
        self.graph = graph
    
    def mcts(self):
        self.do_step(previous_rewards=self.reward)
        graph.add_node(self.state)
        # 1. Pick a node given the policy
        # 2. 

In [28]:
monte_carlo = MonteCarlo(env=env, reward=0, graph=graph)

In [29]:
monte_carlo.mcts()

TypeError: unhashable type: 'list'

# 3. Deep-Q learning

# Analytic solution

In order to solve the problem using an analytic solution, given the restricted size of the environment (500), we could explore all the possible states