In [1]:
pip install gymnasium

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0


In [2]:

import numpy as np
import torch
from tqdm import tqdm
from gymnasium.wrappers import TimeLimit
import gymnasium as gym
from statistics import mean
from functools import partial
from typing import Protocol
import random
import os
from pathlib import Path
from copy import deepcopy
import torch.nn as nn
from sklearn.ensemble import RandomForestRegressor
import pickle


In [3]:

from google.colab import drive
drive.mount('/content/drive')

# Access the specific folder in your MyDrive
path = '/content/drive/MyDrive/MVA/'



Mounted at /content/drive


# 1 - Environment for HIV Patient

Imported all code from src/env_hiv.py

In [4]:
class HIVPatient(gym.Env):
    """HIV patient simulator

    Implements the simulator defined in 'Dynamic Multidrug Therapies for HIV: Optimal and STI Control Approaches' by Adams et al. (2004).
    The transition() function allows to simulate continuous time dynamics and control.
    The step() function is tailored for the evaluation of Structured Treatment Interruptions.
    """

    def __init__(
        self, clipping=True, logscale=False, domain_randomization: bool = False
    ):
        super(HIVPatient, self).__init__()

        self.domain_randomization = domain_randomization
        self.action_space = gym.spaces.Discrete(4)
        self.observation_space = gym.spaces.Box(
            shape=(6,), low=-np.inf, high=np.inf, dtype=np.float32
        )

        self.T1 = 163573.0  # healthy type 1 cells concentration (cells per mL)
        self.T1star = 11945.0  # infected type 1 cells concentration (cells per mL)
        self.T2 = 5.0  # healthy type 2 cells concentration (cells per mL)
        self.T2star = 46.0  # infected type 2 cells concentration (cells per mL)
        self.V = 63919.0  # free virus (copies per mL)
        self.E = 24.0  # immune effector cells concentration (cells per mL)

        # actions
        self.action_set = [
            np.array(pair) for pair in [[0.0, 0.0], [0.0, 0.3], [0.7, 0.0], [0.7, 0.3]]
        ]

        self._reset_patient_parameters()

        # action bounds
        self.a1 = 0.0  # lower bound on reverse transcriptase efficacy
        self.b1 = 0.7  # lower bound on reverse transcriptase efficacy
        self.a2 = 0.0  # lower bound on protease inhibitor efficacy
        self.b2 = 0.3  # lower bound on protease inhibitor efficacy

        # reward model
        self.Q = 0.1
        self.R1 = 20000.0
        self.R2 = 20000.0
        self.S = 1000.0

        # options
        self.clipping = clipping  # clip the state to the upper and lower bounds (affects the state attributes, which are clipped before being stored in T1, T1star, etc.)
        self.logscale = logscale  # convert state to log10-scale before returning (does not affect the state attributes, which remain stored without the log10 operation)
        self.T1Upper = 1e6
        self.T1starUpper = 5e4
        self.T2Upper = 3200.0
        self.T2starUpper = 80.0
        self.VUpper = 2.5e5
        self.EUpper = 353200.0
        self.upper = np.array(
            [
                self.T1Upper,
                self.T1starUpper,
                self.T2Upper,
                self.T2starUpper,
                self.VUpper,
                self.EUpper,
            ]
        )
        self.T1Lower = 0.0
        self.T1starLower = 0.0
        self.T2Lower = 0.0
        self.T2starLower = 0.0
        self.VLower = 0.0
        self.ELower = 0.0
        self.lower = np.array(
            [
                self.T1Lower,
                self.T1starLower,
                self.T2Lower,
                self.T2starLower,
                self.VLower,
                self.ELower,
            ]
        )

    def rawstate(self):
        return np.array([self.T1, self.T1star, self.T2, self.T2star, self.V, self.E])

    def state(self):
        s = np.array([self.T1, self.T1star, self.T2, self.T2star, self.V, self.E])
        if self.clipping:
            np.clip(s, self.lower, self.upper, out=s)
        if self.logscale:
            s = np.log10(s)
        return s

    def _reset_patient_parameters(self):
        if self.domain_randomization:
            # randomly changing patient parameters
            self.k1 = np.random.uniform(low=5e-7, high=8e-7)
            # cell2
            self.k2 = np.random.uniform(low=0.1e-4, high=1.0e-4)
            self.f = np.random.uniform(low=0.29, high=0.34)

        else:

            self.k1 = 8e-7  # infection rate (mL per virions and per day)

            self.k2 = 1e-4  # infection rate (mL per virions and per day)
            self.f = 0.34  # treatment efficacy reduction for type 2 cells
        # patient parameters

        # cell type 1
        self.lambda1 = 1e4  # production rate (cells per mL and per day)
        self.d1 = 1e-2  # death rate (per day)
        self.m1 = 1e-5  # immune-induced clearance rate (mL per cells and per day)
        self.rho1 = 1  # nb virions infecting a cell (virions per cell)

        # cell type 2
        self.lambda2 = 31.98  # production rate (cells per mL and per day)
        self.d2 = 1e-2  # death rate (per day)
        self.m2 = 1e-5  # immune-induced clearance rate (mL per cells and per day)
        self.rho2 = 1  # nb virions infecting a cell (virions per cell)
        # infected cells
        self.delta = 0.7  # death rate (per day)
        self.NT = 100  # virions produced (virions per cell)
        self.c = 13  # virus natural death rate (per day)
        # immune response (immune effector cells)
        self.lambdaE = 1  # production rate (cells per mL and per day)
        self.bE = 0.3  # maximum birth rate (per day)
        self.Kb = 100  # saturation constant for birth (cells per mL)
        self.dE = 0.25  # maximum death rate (per day)
        self.Kd = 500  # saturation constant for death (cells per mL)
        self.deltaE = 0.1  # natural death rate (per day)

    def reset(
        self, *, seed: int | None = None, options: dict | None = None, mode="unhealthy"
    ):
        if mode == "uninfected":
            self.T1 = 1e6
            self.T1star = 0.0
            self.T2 = 3198.0
            self.T2star = 0.0
            self.V = 0.0
            self.E = 10.0
        elif mode == "unhealthy":
            self.T1 = 163573.0
            self.T1star = 11945.0
            self.T2 = 5.0
            self.T2star = 46.0
            self.V = 63919.0
            self.E = 24.0
        elif mode == "healthy":
            self.T1 = 967839.0
            self.T1star = 76.0
            self.T2 = 621.0
            self.T2star = 6.0
            self.V = 415.0
            self.E = 353108.0
        else:
            print("Patient mode '", mode, "' unrecognized. State unchanged.")

        self._reset_patient_parameters()

        return self.state(), {}

    def der(self, state, action):
        T1 = state[0]
        T1star = state[1]
        T2 = state[2]
        T2star = state[3]
        V = state[4]
        E = state[5]

        eps1 = action[0]
        eps2 = action[1]

        T1dot = self.lambda1 - self.d1 * T1 - self.k1 * (1 - eps1) * V * T1

        T1stardot = (
            self.k1 * (1 - eps1) * V * T1 - self.delta * T1star - self.m1 * E * T1star
        )
        T2dot = self.lambda2 - self.d2 * T2 - self.k2 * (1 - self.f * eps1) * V * T2
        T2stardot = (
            self.k2 * (1 - self.f * eps1) * V * T2
            - self.delta * T2star
            - self.m2 * E * T2star
        )
        Vdot = (
            self.NT * self.delta * (1 - eps2) * (T1star + T2star)
            - self.c * V
            - (
                self.rho1 * self.k1 * (1 - eps1) * T1
                + self.rho2 * self.k2 * (1 - self.f * eps1) * T2
            )
            * V
        )
        Edot = (
            self.lambdaE
            + self.bE * (T1star + T2star) * E / (T1star + T2star + self.Kb)
            - self.dE * (T1star + T2star) * E / (T1star + T2star + self.Kd)
            - self.deltaE * E
        )
        return np.array([T1dot, T1stardot, T2dot, T2stardot, Vdot, Edot])

    def transition(self, state, action, duration):
        """duration should be a multiple of 1e-3"""
        state0 = np.copy(state)
        state0_orig = np.copy(state)
        nb_steps = int(duration // 1e-3)
        for i in range(nb_steps):
            der = self.der(state0, action)
            state1 = state0 + der * 1e-3

            # np.clip(state1, self.lower, self.upper, out=state1)
            state0 = state1
        return state1

    def reward(self, state, action, state2):
        rew = -(
            self.Q * state[4]
            + self.R1 * action[0] ** 2
            + self.R2 * action[1] ** 2
            - self.S * state[5]
        )
        return rew

    def step(self, a_index):
        state = self.state()
        action = self.action_set[a_index]
        state2 = self.transition(state, action, 5)
        rew = self.reward(state, action, state2)
        if self.clipping:
            np.clip(state2, self.lower, self.upper, out=state2)

        self.T1 = state2[0]
        self.T1star = state2[1]
        self.T2 = state2[2]
        self.T2star = state2[3]
        self.V = state2[4]
        self.E = state2[5]

        if self.logscale:
            state2 = np.log10(state2)

        return state2, rew, False, False, {}



class Agent(Protocol):
    """
    Defines an interface for agents in a simulation or decision-making environment.

    An Agent must implement methods to act based on observations, save its state to a file,
    and load its state from a file. This interface uses the Protocol class from the typing
    module to specify methods that concrete classes must implement.

    Protocols are a way to define formal Python interfaces. They allow for type checking
    and ensure that implementing classes provide specific methods with the expected signatures.
    """

    def act(self, observation: np.ndarray, use_random: bool = False) -> int:
        """
        Determines the next action based on the current observation from the environment.

        Implementing this method requires processing the observation and optionally incorporating
        randomness into the decision-making process (e.g., for exploration in reinforcement learning).

        Args:
            observation (np.ndarray): The current environmental observation that the agent must use
                                       to decide its next action. This array typically represents
                                       the current state of the environment.
            use_random (bool, optional): A flag to indicate whether the agent should make a random
                                         decision. This is often used for exploration. Defaults to False.

        Returns:
            int: The action to be taken by the agent.
        """
        pass

    def save(self, path: str) -> None:
        """
        Saves the agent's current state to a file specified by the path.

        This method should serialize the agent's state (e.g., model weights, configuration settings)
        and save it to a file, allowing the agent to be later restored to this state using the `load` method.

        Args:
            path (str): The file path where the agent's state should be saved.

        """
        pass

    def load(self) -> None:
        """
        Loads the agent's state from a file specified by the path (HARDCODED). This not a good practice,
        but it will simplify the grading process.

        This method should deserialize the saved state (e.g., model weights, configuration settings)
        from the file and restore the agent to this state. Implementations must ensure that the
        agent's state is compatible with the `act` method's expectations.

        Note:
            It's important to ensure that neural network models (if used) are loaded in a way that is
            compatible with the execution device (e.g., CPU, GPU). This may require specific handling
            depending on the libraries used for model implementation. WARNING: THE GITHUB CLASSROOM
        HANDLES ONLY CPU EXECUTION. IF YOU USE A NEURAL NETWORK MODEL, MAKE SURE TO LOAD IT IN A WAY THAT
        DOES NOT REQUIRE A GPU.
        """
        pass




def evaluate_agent(agent: Agent, env: gym.Env, nb_episode: int = 10) -> float:
    """
    Evaluate an agent in a given environment.

    Args:
        agent (Agent): The agent to evaluate.
        env (gym.Env): The environment to evaluate the agent in.
        nb_episode (int): The number of episode to evaluate the agent.

    Returns:
        float: The mean reward of the agent over the episodes.
    """
    rewards: list[float] = []
    for _ in range(nb_episode):
        obs, info = env.reset()
        done = False
        truncated = False
        episode_reward = 0
        while not done and not truncated:
            action = agent.act(obs)
            obs, reward, done, truncated, _ = env.step(action)
            episode_reward += reward
        rewards.append(episode_reward)
    return mean(rewards)


evaluate_HIV = partial(
    evaluate_agent, env=TimeLimit(HIVPatient(), max_episode_steps=200)
)


evaluate_HIV_population = partial(
    evaluate_agent,
    env=TimeLimit(HIVPatient(domain_randomization=True), max_episode_steps=200),
)

In [5]:
# Environment setup with a time limit of 200 steps per episode
env = TimeLimit(env=HIVPatient(domain_randomization=True), max_episode_steps=200)

# 2 - Tools functions for the training

# Picking Greedy Action with respect to Q
(from course RL4)

## Replay Buffer
(from course RL4)

In [6]:
class ReplayBuffer:
    def __init__(self, capacity, device):
        self.capacity = capacity
        self.data = []
        self.index = 0
        self.device = device
    def append(self, s, a, r, s_, d):
        if len(self.data) < self.capacity:
            self.data.append(None)
        self.data[self.index] = (s, a, r, s_, d)
        self.index = (self.index + 1) % self.capacity
    def sample(self, batch_size):
        batch = random.sample(self.data, batch_size)
        return list(map(lambda x:torch.Tensor(np.array(x)).to(self.device), list(zip(*batch))))
    def __len__(self):
        return len(self.data)

## DQN Architecture

Used MLP fully connected with ReLU() for the architecutre. \\
Final architecture : 6 layers of size 256.

In [7]:
hidden_layer_dim = 256
inner_dim = 128

DQN = torch.nn.Sequential(
            torch.nn.Linear(env.observation_space.shape[0], hidden_layer_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(hidden_layer_dim, hidden_layer_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(hidden_layer_dim, inner_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(inner_dim, inner_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(inner_dim, hidden_layer_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(hidden_layer_dim, hidden_layer_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(hidden_layer_dim, env.action_space.n)
)

# 3 - Agent Definition

Agent construction method and train method are highly inspired from the implementation of the DQN with target network provided in the solution of course RL4 (CartPole exemple).

I added to this implementation the following improvements :

*   Deeper architecture for the DQN
*   Chose best hyperparameters for this architecture after a few runs
*   Tried with RMSProp optimizer, but better results with Adam



In [8]:
# ProjectAgent class to define agent logic
class ProjectAgent:
    def __init__(self):
        # Configuration for the agent
        self.nb_actions = env.action_space.n
        self.learning_rate = 0.0001
        self.gamma = 0.95
        self.buffer_size = 50000
        self.epsilon_min = 0.01
        self.epsilon_max = 1.0
        self.epsilon_decay_period = 50000
        self.epsilon_delay_decay = 1000
        self.epsilon_delay = 100
        self.epsilon_step = (self.epsilon_max - self.epsilon_min) / self.epsilon_decay_period
        self.batch_size = 1024
        self.gradient_steps = 10
        self.update_target_strategy = 'replace'
        self.update_target_freq = 500
        self.update_target_tau = 0.01
        self.criterion = torch.nn.SmoothL1Loss()
        self.fine_tuning = False

        # Linking first the models to GPU device
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        if not self.fine_tuning:
            self.model = DQN.to(self.device)
        self.target_model = deepcopy(self.model).to(self.device)
        self.memory = ReplayBuffer(self.buffer_size, self.device)

        # Choice of optimizers for both networks : Adams
        self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.learning_rate)

    def act(self, observation, use_random=False):
        # Acting greedily towards Q
        with torch.no_grad():  # Disable gradient computation for action selection
            Q = self.model(torch.Tensor(observation).unsqueeze(0).to(self.device))
            return torch.argmax(Q).item()


    def save(self, path):
        self.path = path + "final_dqn.pt"
        torch.save(self.model.state_dict(), self.path)

    def load(self):
        self.path = path + "final_dqn.pt"
        self.model = DQN.to(self.device)
        self.model.load_state_dict(torch.load(self.path, map_location=self.device))
        self.model.eval()

    def train(self, max_episode):
        previous_val = 0
        episode_return = []
        episode = 0
        episode_cum_reward = 0
        state, _ = env.reset()
        epsilon = self.epsilon_max
        step = 0

        while episode < max_episode:
            # Decay epsilon after a certain number of steps
            if step > self.epsilon_delay:
                epsilon = max(self.epsilon_min, epsilon - self.epsilon_step)

            # Action selection (epsilon-greedy)
            if np.random.rand() < epsilon:
                action = env.action_space.sample()  # Exploration
            else:
                action = self.act(state)  # Exploitation

            # Take action, observe result
            next_state, reward, done, trunc, _ = env.step(action)
            self.memory.append(state, action, reward, next_state, done)
            episode_cum_reward += reward

            # Train the model
            for _ in range(self.gradient_steps):
                if len(self.memory) > self.batch_size:
                    X, A, R, Y, D = self.memory.sample(self.batch_size)
                    QYmax = self.target_model(Y).max(1)[0].detach()
                    update = torch.addcmul(R, 1 - D, QYmax, value=self.gamma)
                    QXA = self.model(X).gather(1, A.to(torch.long).unsqueeze(1))
                    loss = self.criterion(QXA, update.unsqueeze(1))
                    self.optimizer.zero_grad()
                    loss.backward()
                    self.optimizer.step()

            # Update target model periodically
            if self.update_target_strategy == 'replace':
                if step % self.update_target_freq == 0:
                    self.target_model.load_state_dict(self.model.state_dict())

            step += 1
            if done or trunc:
                episode += 1
                val_score = evaluate_HIV(agent=self, nb_episode=1)

                # Print training progress
                print(f"Episode {episode:3d} | "
                      f"Epsilon {epsilon:6.2f} | "
                      f"Batch Size {len(self.memory):5d} | "
                      f"Episode Return {episode_cum_reward:.2e} | "
                      f"Evaluation Score {val_score:.2e}")
                state, _ = env.reset()

                # Save model if evaluation score improves
                if val_score > previous_val:
                    previous_val = val_score
                    self.best_model = deepcopy(self.model).to(self.device)
                    path = os.getcwd()
                    self.save(path)
                episode_return.append(episode_cum_reward)
                episode_cum_reward = 0
            else:
                state = next_state

        # Load the best model and save it
        self.model.load_state_dict(self.best_model.state_dict())
        path = os.getcwd()
        self.save(path)
        return episode_return

def seed_everything(seed: int = 42):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    torch.cuda.manual_seed_all(seed)


In [9]:
if __name__ == "__main__":
    file = Path("score.txt")
    if not file.is_file():
        seed_everything(seed=42)
        # Initialization of the agent. Replace DummyAgent with your custom agent implementation.
        agent = ProjectAgent()
        agent.train(500)
        agent.save(path)

        # Evaluate agent and write score.
        score_agent: float = evaluate_HIV(agent=agent, nb_episode=5)
        score_agent_dr: float = evaluate_HIV_population(agent=agent, nb_episode=20)
        print(f"Score agent: {score_agent}")
        print(f"Score agent with domain randomization: {score_agent_dr}")
        with open(file= path + "score.txt", mode="w") as f:
            f.write(f"{score_agent}\n{score_agent_dr}")

Episode   1 | Epsilon   1.00 | Batch Size   200 | Episode Return 1.21e+07 | Evaluation Score 3.43e+06
Episode   2 | Epsilon   0.99 | Batch Size   400 | Episode Return 9.11e+06 | Evaluation Score 3.43e+06
Episode   3 | Epsilon   0.99 | Batch Size   600 | Episode Return 6.14e+06 | Evaluation Score 3.43e+06
Episode   4 | Epsilon   0.99 | Batch Size   800 | Episode Return 8.89e+06 | Evaluation Score 3.43e+06
Episode   5 | Epsilon   0.98 | Batch Size  1000 | Episode Return 1.31e+07 | Evaluation Score 3.43e+06
Episode   6 | Epsilon   0.98 | Batch Size  1200 | Episode Return 7.49e+06 | Evaluation Score 3.43e+06
Episode   7 | Epsilon   0.97 | Batch Size  1400 | Episode Return 5.86e+06 | Evaluation Score 3.43e+06
Episode   8 | Epsilon   0.97 | Batch Size  1600 | Episode Return 8.90e+06 | Evaluation Score 3.43e+06
Episode   9 | Epsilon   0.97 | Batch Size  1800 | Episode Return 8.13e+06 | Evaluation Score 3.43e+06
Episode  10 | Epsilon   0.96 | Batch Size  2000 | Episode Return 1.13e+07 | Evalua