### Run in collab
<a href="https://colab.research.google.com/github/racousin/data_science_practice/blob/master/website/public/modules/data-science-practice/module9/exercise/module9_exercise2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%capture
!pip install swig==4.2.1
!pip install gymnasium==1.2.0

In [2]:
import warnings
warnings.filterwarnings('ignore')
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt

# module9_exercise2 : ML - Arena <a href="https://ml-arena.com/viewcompetition/5" target="_blank"> FrozenLake Competition</a>

### Objective
Get at list an agent running on ML-Arena <a href="https://ml-arena.com/viewcompetition/5" target="_blank"> FrozenLake Competition</a> with mean reward upper than 0.35 (ie 35%)


You should submit an agent file named `agent.py` with a class `Agent` that includes at least the following attributes:

In [59]:
class Agent_0:
    def __init__(self, env):
        self.env = env

    def choose_action(self, observation, reward=0.0, terminated=False, truncated=False, info=None):
        action = self.env.action_space.sample() # your logic here
        return action

### Description

The game starts with the player at location [0,0] of the frozen lake grid world with the goal located at far extent of the world [7,7].

Holes in the ice are distributed in set locations.

The player makes moves until they reach the goal or fall in a hole.

Each run will consist of 10 attempts to cross the ice. The reward will be the total amount accumulated during those trips. For example, if your agent reaches the goal 3 times out of 10, its reward will be 3.

The environment is based on :

In [60]:
env = gym.make('FrozenLake-v1', map_name="8x8")

In [61]:
import numpy as np

class Agent:
    def __init__(self, env, gamma=0.99, theta=1e-10, max_iter=100000):
        self.env = env
        self.gamma = gamma
        self.theta = theta
        self.max_iter = max_iter

        # FrozenLake: Discrete states/actions
        self.nS = env.observation_space.n
        self.nA = env.action_space.n

        # Transition model (FrozenLake exposes env.unwrapped.P)
        self.P = getattr(getattr(env, "unwrapped", env), "P", None)

        # Policy: default random (fallback if P is missing)
        self.policy = np.random.randint(self.nA, size=self.nS)

        # If we have P, compute optimal policy with value iteration
        if self.P is not None:
            self.policy = self._compute_optimal_policy_value_iteration()

    def _compute_optimal_policy_value_iteration(self):
        V = np.zeros(self.nS, dtype=np.float64)

        for _ in range(self.max_iter):
            delta = 0.0
            for s in range(self.nS):
                # Bellman optimality backup
                q_sa = np.zeros(self.nA, dtype=np.float64)
                for a in range(self.nA):
                    for (p, s2, r, done) in self.P[s][a]:
                        # If terminal, no bootstrap after transition
                        q_sa[a] += p * (r + (0.0 if done else self.gamma * V[s2]))

                v_new = np.max(q_sa)
                delta = max(delta, abs(v_new - V[s]))
                V[s] = v_new

            if delta < self.theta:
                break

        # Greedy policy w.r.t V
        policy = np.zeros(self.nS, dtype=np.int64)
        for s in range(self.nS):
            q_sa = np.zeros(self.nA, dtype=np.float64)
            for a in range(self.nA):
                for (p, s2, r, done) in self.P[s][a]:
                    q_sa[a] += p * (r + (0.0 if done else self.gamma * V[s2]))


            best_actions = np.flatnonzero(q_sa == np.max(q_sa))
            policy[s] = int(best_actions[0])  # deterministic

        return policy

    def _obs_to_state(self, observation):
        # Gymnasium can sometimes pass (obs, info)
        if isinstance(observation, tuple):
            observation = observation[0]

        # If it's a numpy scalar / array([k])
        if isinstance(observation, np.ndarray):
            if observation.size == 1:
                observation = observation.item()
            else:
                # If one-hot (rare here), use argmax
                observation = int(np.argmax(observation))

        return int(observation)

    def choose_action(self, observation, reward=0.0, terminated=False, truncated=False, info=None):
        s = self._obs_to_state(observation)
        return int(self.policy[s])

### Before submit
Test that your agent has the right attributes

In [63]:
env = gym.make('FrozenLake-v1', map_name="8x8")
agent = Agent(env)

observation, _ = env.reset()
reward, terminated, truncated, info = None, False, False, None
rewards = []
while not (terminated or truncated):
    action = agent.choose_action(observation, reward=reward, terminated=terminated, truncated=truncated, info=info)
    observation, reward, terminated, truncated, info = env.step(action)
    rewards.append(reward)
print(f'Cumulative Reward: {sum(rewards)}')

Cumulative Reward: 1.0


In [64]:
import numpy as np

def eval_agent(env, agent, n_episodes=3000, seed=0):
    rng = np.random.default_rng(seed)
    episode_returns = []

    for _ in range(n_episodes):
        obs, info = env.reset(seed=int(rng.integers(0, 1_000_000)))
        terminated = truncated = False
        total = 0.0

        while not (terminated or truncated):
            a = agent.choose_action(obs)
            obs, r, terminated, truncated, info = env.step(a)
            total += r

        episode_returns.append(total)

    episode_returns = np.array(episode_returns, dtype=float)
    mean = episode_returns.mean()
    std = episode_returns.std(ddof=1)
    # CI 95% approx normale (ok si n grand)
    se = np.sqrt(mean * (1 - mean) / n_episodes)  # car 0/1
    ci95 = (mean - 1.96 * se, mean + 1.96 * se)
    return mean, std, ci95, episode_returns

mean, std, ci95, returns = eval_agent(env, agent, n_episodes=5000, seed=42)
print("Mean reward:", mean)
print("Std:", std)
print("95% CI:", ci95)
print("Meets requirement (>=0.4):", mean >= 0.4)

Mean reward: 0.6254
Std: 0.4840678716263539
95% CI: (np.float64(0.6119836649911833), np.float64(0.6388163350088166))
Meets requirement (>=0.4): True


# Optimisation Test 1 : Changer l’objectif : optimiser la probabilité de succès, pas le retour discounté

In [65]:
env = gym.make("FrozenLake-v1", map_name="8x8")

In [66]:
import numpy as np

class Agent_1:
    def __init__(self, env, gamma=0.99, theta=1e-10, max_iter=100000):
        self.env = env
        self.gamma = gamma
        self.theta = theta
        self.max_iter = max_iter

        # FrozenLake: Discrete states/actions
        self.nS = env.observation_space.n
        self.nA = env.action_space.n

        # Transition model (FrozenLake exposes env.unwrapped.P)
        self.P = getattr(getattr(env, "unwrapped", env), "P", None)

        # Policy: default random (fallback if P is missing)
        self.policy = np.random.randint(self.nA, size=self.nS)

        # If we have P, compute optimal policy with value iteration
        if self.P is not None:
            self.policy = self._compute_optimal_policy_reachability()

    def _compute_optimal_policy_reachability(self, theta=1e-12, max_iter=100000):
        V = np.zeros(self.nS, dtype=np.float64)

        # --- trouver goal/holes depuis la grille ---
        desc = self.env.unwrapped.desc  # shape (nrow, ncol), dtype 'S1'
        nrow, ncol = desc.shape

        goal_states = set()
        hole_states = set()
        for r in range(nrow):
            for c in range(ncol):
                s = r * ncol + c
                if desc[r, c] == b'G':
                    goal_states.add(s)
                elif desc[r, c] == b'H':
                    hole_states.add(s)

        terminal_states = goal_states | hole_states

        # Fixer les valeurs terminales
        for s in goal_states:
            V[s] = 1.0
        for s in hole_states:
            V[s] = 0.0

        # --- value iteration "probabilité d'atteindre le goal" ---
        for _ in range(max_iter):
            delta = 0.0
            for s in range(self.nS):
                if s in terminal_states:
                    continue

                q_sa = np.zeros(self.nA, dtype=np.float64)
                for a in range(self.nA):
                    for (p, s2, r, done) in self.P[s][a]:
                        # reachability: on propage V(s2) (goal=1, hole=0)
                        q_sa[a] += p * V[s2]

                v_new = np.max(q_sa)
                delta = max(delta, abs(v_new - V[s]))
                V[s] = v_new

            if delta < theta:
                break

        # --- politique greedy + tie-break anti-hole (optionnel mais utile) ---
        def prob_hole(s, a):
            ph = 0.0
            for (p, s2, r, done) in self.P[s][a]:
                if s2 in hole_states:
                    ph += p
            return ph

        policy = np.zeros(self.nS, dtype=np.int64)
        for s in range(self.nS):
            if s in terminal_states:
                policy[s] = 0
                continue

            q_sa = np.zeros(self.nA, dtype=np.float64)
            for a in range(self.nA):
                for (p, s2, r, done) in self.P[s][a]:
                    q_sa[a] += p * V[s2]

            best = np.flatnonzero(q_sa == np.max(q_sa))
            if len(best) > 1:
                holes = np.array([prob_hole(s, a) for a in best])
                best = best[np.flatnonzero(holes == holes.min())]

            policy[s] = int(np.random.choice(best))

        return policy

    def _obs_to_state(self, observation):
        # Gymnasium can sometimes pass (obs, info)
        if isinstance(observation, tuple):
            observation = observation[0]

        # If it's a numpy scalar / array([k])
        if isinstance(observation, np.ndarray):
            if observation.size == 1:
                observation = observation.item()
            else:
                # If one-hot (rare here), use argmax
                observation = int(np.argmax(observation))

        return int(observation)

    def choose_action(self, observation, reward=0.0, terminated=False, truncated=False, info=None):
        s = self._obs_to_state(observation)
        return int(self.policy[s])

In [68]:
agent = Agent_1(env)
mean, std, ci95, returns = eval_agent(env, agent, n_episodes=5000, seed=42)
print("Mean reward:", mean)
print("Std:", std)
print("95% CI:", ci95)
print("Meets requirement (>=0.4):", mean >= 0.4)

Mean reward: 0.5168
Std: 0.49976765956062874
95% CI: (np.float64(0.5029485325916999), np.float64(0.5306514674083002))
Meets requirement (>=0.4): True


# Optimisation Test 2 : Vérifier le vrai horizon (TimeLimit) et l'optimiser

In [69]:
H = env.spec.max_episode_steps
print("max_episode_steps =", H)

max_episode_steps = 100


In [73]:
import numpy as np

class Agent_2:
    def __init__(self, env, theta=1e-10, max_iter=100000):
        self.env = env
        self.theta = theta
        self.max_iter = max_iter
        self.nS = env.observation_space.n
        self.nA = env.action_space.n
        self.P = getattr(getattr(env, "unwrapped", env), "P", None)

        desc = self.env.unwrapped.desc
        nrow, ncol = desc.shape

        def cell(i, j):
            v = desc[i, j]
            return v.decode("utf-8") if isinstance(v, (bytes, np.bytes_)) else str(v)

        self.start_state = None
        self.goal_states = set()
        self.hole_states = set()

        for r in range(nrow):
            for c in range(ncol):
                s = r * ncol + c
                ch = cell(r, c)
                if ch == "S":
                    self.start_state = s
                elif ch == "G":
                    self.goal_states.add(s)
                elif ch == "H":
                    self.hole_states.add(s)

        if self.start_state is None:
            self.start_state = 0

        self.t = 0
        self.H = env.spec.max_episode_steps if env.spec is not None else 100

        self.policy_t = None
        self.policy = np.random.randint(self.nA, size=self.nS)

        if self.P is not None:
            self.policy_t = self._compute_optimal_policy_finite_horizon(self.H)
            self.policy = self.policy_t[0]


    def _compute_optimal_policy_finite_horizon(self, H):
        desc = self.env.unwrapped.desc
        nrow, ncol = desc.shape

        goal_states = set()
        hole_states = set()
        for r in range(nrow):
            for c in range(ncol):
                s = r * ncol + c
                if desc[r, c] == b'G':
                    goal_states.add(s)
                elif desc[r, c] == b'H':
                    hole_states.add(s)
        terminal_states = goal_states | hole_states

        V_next = np.zeros(self.nS, dtype=np.float64)
        for s in goal_states:
            V_next[s] = 1.0

        policy_t = np.zeros((H, self.nS), dtype=np.int64)

        for t in reversed(range(H)):
            V = V_next.copy()

            for s in range(self.nS):
                if s in terminal_states:
                    policy_t[t, s] = 0
                    continue

                q_sa = np.zeros(self.nA, dtype=np.float64)
                for a in range(self.nA):
                    for (p, s2, r, done) in self.P[s][a]:
                        q_sa[a] += p * V_next[s2]

                best = np.flatnonzero(q_sa == np.max(q_sa))
                if len(best) > 1:
                    def prob_hole(a):
                        ph = 0.0
                        for (p, s2, r, done) in self.P[s][a]:
                            if s2 in hole_states:
                                ph += p
                        return ph
                    holes = np.array([prob_hole(a) for a in best])
                    best = best[np.flatnonzero(holes == holes.min())]

                policy_t[t, s] = int(best[0])
                V[s] = np.max(q_sa)

            V_next = V

        return policy_t

    def _obs_to_state(self, observation):
        if isinstance(observation, tuple):
            observation = observation[0]

        if isinstance(observation, np.ndarray):
            if observation.size == 1:
                observation = observation.item()
            else:
                observation = int(np.argmax(observation))

        return int(observation)

    def choose_action(self, observation, reward=0.0, terminated=False, truncated=False, info=None):
        s = self._obs_to_state(observation)

        if terminated or truncated:
            self.t = 0

        if s == self.start_state and self.t > 0:
            self.t = 0

        if self.policy_t is not None:
            tt = self.t if self.t < self.H else self.H - 1
            a = int(self.policy_t[tt, s])
        else:
            a = int(self.policy[s])

        self.t += 1
        return a

In [75]:
agent = Agent_2(env)
mean, std, ci95, returns = eval_agent(env, agent, n_episodes=5000, seed=42)
print("Mean reward:", mean)
print("Std:", std)
print("95% CI:", ci95)
print("Meets requirement (>=0.4):", mean >= 0.4)

Mean reward: 0.6308
Std: 0.4826364548907572
95% CI: (np.float64(0.6174233378709336), np.float64(0.6441766621290664))
Meets requirement (>=0.4): True
