 Copyright © Sorbonne University.

 This source code is licensed under the MIT license found in the
 LICENSE file in the root directory of this source tree.

# Outlook

In this notebook we study model-based reinforcement learning algorithms.
We investigate two ways of learning a model of the transition function,
using either a stochastic or a deterministic model.

Besides, we compare the sample efficiency of learning a model of the transition function using random actions then learning the Q-function without new samples
versus using the Dyna-Q algorithm, which simultaneously learns a model and uses it "in imagination" to improve the policy of the agent.

In [None]:
# Installs the necessary Python and system libraries
try:
    from easypip import easyimport, easyinstall, is_notebook
except ModuleNotFoundError as e:
    get_ipython().run_line_magic("pip", "install 'easypip>=1.2.0'")
    from easypip import easyimport, easyinstall, is_notebook

easyinstall("swig")
easyinstall("bbrl>=0.2.2")
easyinstall("gymnasium")
easyinstall("mazemdp")
easyinstall("bbrl_gymnasium>=0.2.0")
easyinstall("tensorboard")
easyinstall("moviepy")
easyinstall("box2d-kengz")

from moviepy.editor import ipython_display as video_display

In [None]:
easyimport("bbrl_gymnasium")
import os
from typing import Tuple, List

import numpy as np
import matplotlib
import matplotlib.pyplot as plt

# tqdm was removed from q-learning and sarsa because when we use them
# many times in a loop (e.g. for hyper-param tuning), it slows down the computation a lot
if is_notebook() and get_ipython().__class__.__module__ != "google.colab._shell":
   from tqdm.autonotebook import tqdm
else:
   from tqdm.auto import tqdm

from mazemdp.toolbox import egreedy, softmax, sample_categorical
from mazemdp.mdp import Mdp
from bbrl_gymnasium.envs.maze_mdp import MazeMDPEnv
from gymnasium.wrappers.monitoring.video_recorder import VideoRecorder

# For visualization
os.environ["VIDEO_FPS"] = "5"
if not os.path.isdir("./videos"):
    os.mkdir("./videos")

from IPython.display import Video

import random

# Model-Based Reinforcement Learning

Model-Based Reinforcement Learning (MBRL) is an approach to RL where the agent learns a model of the transition function and uses it to update its value function and/or its policy. To learn the model of the transition function, the agent needs to interact with the environment. It can do so either with a fixed exploration policy or with the policy it is learning simultaneously with model of the transition function. To improve its target policy, there are several MBRL approaches:

- the agent can apply dynamic programming (value iteration or policy iteration) algorithms using the learned model of the transition function

- the agent can draw random transitions from the learned model and perform Bellman backups using these samples to update a critic model. This is called learning in imagination, and this corresponds to Dyna-Q when the agent uses the Q-learning update rule to perform Bellman backups.

- the agent can do the same as above, but drawing transitions efficiently instead of randomly. This corresponds to Prioritized sweeping and its variants.

As for model-free reinforcement learning, we first create a maze-like MDP. 

In [None]:
import gymnasium as gym
import bbrl_gymnasium
from bbrl_gymnasium.envs.maze_mdp import MazeMDPEnv

# Environment with 20% of walls and no negative reward when hitting a wall
env = gym.make("MazeMDP-v0", kwargs={"width": 4, "height": 3, "ratio": 0.2, "hit": 0.0}, render_mode="rgb_array")
env.reset()

## Transition models

We create two TransitionModel classes that contain the learned model of the transition function.
Both models inherit from a more generic TransitionModel class.

In [None]:
class TransitionModel():
   def __init__(self, env):
      self.nb_states = env.unwrapped.nb_states # the .unwrapped removes a warning from gymnasium
      self.nb_actions = env.action_space.n

   def predict(self, state, action):
      pass

   def add_transition(self, state, action, next_state):
      pass
      
   # This function draws a random transition from the model.

   def sample_transition(self): # in BBRL, this will be renamed into forward
      state = random.randint(0, self.nb_states - 1)
      action = random.randint(0, self.nb_actions - 1)
      next_state = self.predict(state, action)
      return state, action, next_state

   # To monitor the accuracy of the model of the transition function, we build an evaluate function.
   # This function draws sample_size random transitions from the real maze and checks whether the model outputs the same next state.
   # This way to proceed is only adequate if the real maze is deterministic.
   # Otherwise, we should tell the distance between two probability distributions.

   def evaluate_random(self, env, sample_size: int=100) -> int:
      success = 0
      for _ in range(sample_size):
         state, action, next_state = env.sample_transition()
         next_state_model = self.predict(state, action)
         if next_state == next_state_model:
            success = success + 1
      return success / 100

   def is_accurate(self, env) -> bool:
      pass

   # Useful for debug
   
   def display_count(self):
      print("count :", self.count)

The first model is stochastic, it is initialized with uniform probabilities and updates the probabilities when receiving new evidence.

In [None]:
class StochasticTransitionModel(TransitionModel):
   def __init__(self, env):
      super().__init__(env)
      self.M = np.ones(
            (self.nb_states, self.nb_actions, self.nb_states)
         ) / self.nb_states
      self.I = np.array(range(self.nb_states))
      self.count = np.zeros((self.nb_states, self.nb_actions))

   def predict(self, state, action):
      next_state = sample_categorical(self.M[state, action, :])
      return next_state

   def add_transition(self, state, action, next_state):
      self.count[state, action] = self.count[state, action] + 1
      self.M[state, action, :] = (1-1/self.count[state, action]) * (self.M[state, action, :]).reshape(self.nb_states) + 1/self.count[state, action] * np.transpose((self.I==next_state).astype(int))

   # In the probabilistic case, sampling the right next state is not enough, as it may happen by chance
   # we compare the probabilities

   def is_accurate(self, env) -> bool:
      for state in range(self.nb_states):
         for action in range(self.nb_actions):
            ground_truth_proba = env.unwrapped.P[state, action, :]
            model_proba = self.M[state, action, :]
            error = np.linalg.norm(ground_truth_proba - model_proba)
            if error > 0.01:
               # print(ground_truth_proba, model_proba, error)
               return False
      return True

The second model is determistic, it is initialized as empty and learns new deterministic transitions as they come.

In [None]:
class DeterministicTransitionModel(TransitionModel):
   def __init__(self, env):
      super().__init__(env)
      self.count = np.zeros(
            (self.nb_states, self.nb_actions, self.nb_states)
         )

   def predict(self, state, action):
      next_state = np.argmax(self.count[state, action, :])
      return next_state

   def add_transition(self, state, action, next_state):
      self.count[state, action, next_state] = self.count[state, action, next_state] + 1

   # This function draws all transitions from the real maze and checks whether the model outputs the same next state
   # This way to proceed is only adequate if the real maze is deterministic.

   def is_accurate(self, env) -> bool:
      # To be completed...

      assert False, 'Not implemented yet'


## Reward model

This class is used to learn a model of the reward function.
We consider that the reward function is deterministic.

In [None]:
class RewardModel():
   def __init__(self, env):
      self.nb_states = env.unwrapped.nb_states
      self.nb_actions = env.action_space.n
      self.reward_model = np.zeros((self.nb_states, self.nb_actions))

   def predict(self, state, action):
      return self.reward_model[state, action]

   def learn_reward(self, state, action, reward):
      self.reward_model[state, action] = reward

   # This function draws all transitions from the real maze and checks whether the model outputs the same reward.
   # This way to proceed is only adequate if the real maze is deterministic.

   def is_accurate(self, env) -> bool:
      # To be completed...

      assert False, 'Not implemented yet'


   # Useful for debug
   
   def display_reward(self):
      print("reward model", self.reward_model)

## Termination model

This class is used to learn a model of the termination function.
We consider that termination is deterministic.

In [None]:
class TerminationModel():
   def __init__(self, mdp):
      self.nb_states = mdp.unwrapped.nb_states
      self.nb_actions = mdp.action_space.n
      self.termination_model = np.zeros((self.nb_states, self.nb_actions))

   # This function draws all transitions from the real maze and checks whether the model outputs the same termination
   # This way to proceed is only adequate if the real maze is deterministic.

   def is_accurate(self, env) -> bool:
      for state in range(self.nb_states):
         for action in range(self.nb_actions):
            t_model = self.termination_model[state, action]
            env.mdp.current_state = state # ugly need to set the state from outside
            _, _, terminated, _, _ = env.step(action)
            t_ground_truth = terminated
            if not (t_model == t_ground_truth):
               # print(state, action, t_model, t_ground_truth)
               return False
      return True
   
   def predict(self, state, action):
      return self.termination_model[state, action]

   def learn_termination(self, state, action, termination):
      self.termination_model[state, action] = termination

   # Useful for debug

   def display_termination(self):
      print("terminated model", self.termination_model)

## Learn a forward model with a random policy ##

In the function below, we train a transition model, a reward model and a termination model from
an agent performing random actions
We stop once the transition model and the reward model are accurate enough

In [None]:
def learn_forward_model_from_random_actions(
    mdp: MazeMDPEnv,
    timeout: int = 50,
) -> Tuple[np.ndarray, List[float]]:

   trans_model = StochasticTransitionModel(env)
   reward_model = RewardModel(env)
   terminated_model = TerminationModel(env)

   # Run learning cycle
   mdp.timeout = timeout # episode length
   steps = 0 # number of steps before convergence

   # To be completed...

   assert False, 'Not implemented yet'


   print(f"the number of steps needed to learn accurate models from random actions was {steps}")
   return trans_model, reward_model, terminated_model

## Q-learning agents ##

We will reuse the Q-learning algorithm coded in the previous lab.
But this time, we write it into a more object oriented way.

We start with a general QAgent function and then derive it into SoftmaxQAgent and EgreedyQAgent to account for two exploration methods

In the code below,
- fill the updateQ function

- write the update_Q_from_models function that updates the Q-table from models of the transition, reward and termination functions

In [None]:
class QAgent():
   def __init__(self, mdp, alpha):
      self.nb_states = mdp.unwrapped.nb_states
      self.nb_actions = mdp.action_space.n
      self.gamma = mdp.unwrapped.gamma
      self.alpha = alpha
      self.Q = np.zeros((self.nb_states, self.nb_actions))

   def choose_action(self, state):
      pass

   # Beware that the efficiency is highly dependent on the error threshold
   def is_accurate(self, q_table) -> bool:
      error = np.linalg.norm(self.Q - q_table)
      if error > 0.01:
         # print("self :", self.Q)
         # print("table :", q_table)
         # print(f"QAgent error: {error}")
         return False
      return True

   # Do not forget to deal with the case where the episode is terminated
   def updateQ(self, state, action, reward, next_state, terminated) -> None:
      """
      Performs a Bellman back-up over the Q-table of the agent
      :return: nothing
      """
      # To be completed...

      assert False, 'Not implemented yet'


   # The function below should use the above function
   def update_Q_from_models(self, trans_model:TransitionModel, reward_model:RewardModel, terminated_model:TerminationModel, nb_updates: int) -> None:
      """
      Updates the Q-table of the agent using randomly sampled transitions (i.e. the agent is learning in imagination)
      It does so nb_updates times
      :param trans_model: the model of the transition function used to sample updates
      :param reward_model: the model of the reward function used to compute the reward of the sample updates
      :param terminated_model: the model of the termination function used to determine if the sample updates are terminal
      :param nb_updates: the number of performed updates
      :return: nothing
      """
      # To be completed...

      assert False, 'Not implemented yet'


Fill the choose_action function of both classes below

In [None]:
class SoftmaxQAgent(QAgent):
   def __init__(self, mdp, alpha: float = 0.5, beta: float = 6.0):
      super().__init__(mdp, alpha)
      self.beta = beta

   def choose_action(self, state) -> int:
      # To be completed...

      assert False, 'Not implemented yet'


In [None]:
class EgreedyQAgent(QAgent):
   def __init__(self, mdp, alpha: float = 0.5, epsilon: float = 0.02):
      super().__init__(mdp, alpha)
      self.epsilon = epsilon 

   def choose_action_egreedy(self, state) -> int:
      # To be completed...

      assert False, 'Not implemented yet'


## Import the Value iteration with the $Q$ function algorithm from the dynamic programming lab

In [None]:
# To be completed...

assert False, 'Not implemented yet'


## Learn the Q-table using the learned models

In the function below, we train a QAgent from samples taken randomly from a transition model, a reward model and a termination model
We stop once the QAgent is accurate enough, compared to a Q-table obtained from value_iteration

In [None]:
def learn_Q_from_models(
   agent: QAgent,
   trans_model: StochasticTransitionModel,
   reward_model: RewardModel,
   terminated_model: TerminationModel,
   q_table,
):
   steps = 0 # number of steps before convergence

   # To be completed...

   assert False, 'Not implemented yet'


   return steps

## Putting everything together

Run the model learning algorithm then get an optimal Q-table from that model
We do it several times to obtain an average number of steps

In [None]:
nb_repeats = 10
TIMEOUT = 200

steps = np.zeros(nb_repeats)
for i in range(nb_repeats):
   trans_model, reward_model, terminated_model = learn_forward_model_from_random_actions(env, timeout=TIMEOUT)
   agent = SoftmaxQAgent(env)
   q_table, _ = value_iteration_q(env, render=False)
   steps[i] = learn_Q_from_models(agent, trans_model, reward_model, terminated_model, q_table)
   print(f"the number of steps needed to learn a perfect Q-Table from the models was {steps[i]}")
print(f"the mean number of steps needed to learn a perfect Q-Table from the models was {steps.mean()}")

## Dyna-Q algorithm

This time, we learn the models and the Q functions simultaneously
We perform `nb_updates` steps "in imagination" for each step in the real environment
We stop once the QAgent is accurate enough, compared to a Q-table obtained from value_iteration

In [None]:
def dyna_q_soft(
    mdp: MazeMDPEnv,
    q_table,
    nb_updates: int = 5,
    timeout: int = 50,
) -> Tuple[np.ndarray, List[float]]:
   # Initialize the state-action value function
   # alpha is the learning rate
   agent = SoftmaxQAgent(mdp)
   trans_model = StochasticTransitionModel(env)
   reward_model = RewardModel(env)
   terminated_model = TerminationModel(env)

   # Run learning cycle
   mdp.timeout = timeout  # episode length
   steps = 0

   # To be completed...

   assert False, 'Not implemented yet'

   return steps

Run the Dyna-Q algorithm for different values of nb_updates

In [None]:
TIMEOUT = 200
q_table, _ = value_iteration_q(env, render=False)
for nb_updates in range(10):
   step_array = np.zeros(10)
   for i in range(10):
      step_array[i] = dyna_q_soft(env, q_table, nb_updates, timeout=TIMEOUT)
   print(f"the number of steps needed to learn a perfect Q-Table with Dyna-Q using {nb_updates} updates was {step_array.mean()}")

## Empirical study

- Compare the number of samples used to first learn the model using random actions, then deriving an optimal Q-table from the model, versus improving the Q-table and the behavior of the agent simultaneously.
- Do it for various numbers of updates "in imagination" in the simultaneous learning case.
- Try it using the Deterministic model and the Stochastic model, and using Softmax Exploration versus Egreedy exploration.
- Finally, conclude