In [78]:
import gymnasium as gym
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
from abc import ABC, abstractmethod
import imageio
import glob
import pickle
import IPython
import os
from typing import Tuple, List
from collections import deque, namedtuple

Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])


In [None]:
import gymnasium

env = gymnasium.make('MountainCar-v0', max_episode_steps = 1000, render_mode="rgb_array")
env.action_space.seed(42)

In [None]:
print(env.action_space)
print(env.observation_space)
#obs : position, velocity
obs, info = env.reset() # initial observation
print(obs)  
print(info)  
# take a step
action = 2  # Push right
obs, reward, terminated, truncated, info = env.step(action)
print(obs, reward, terminated, truncated, info)
#rendering
frame = env.render()
print(type(frame))  # <class 'numpy.ndarray'>
plt.imshow(frame)
plt.show()



In [81]:
# state space ranges from -1.2 to 0.6 for position and -0.07 to 0.07 for velocity
# action space is discrete with 3 possible actions
# reward is -1 for each time step until the goal position of 0.5 is reached

In [82]:
def discretize_state(state, agent):
    position, velocity = state
    
    # Create bins dynamically based on agent's n_bins
    position_bins = np.linspace(-1.2, 0.6, agent.n_bins_position)
    velocity_bins = np.linspace(-0.07, 0.07, agent.n_bins_velocity)

    position_index = np.digitize(position, position_bins) - 1
    velocity_index = np.digitize(velocity, velocity_bins) - 1

    return (position_index, velocity_index)


In [None]:
# initialize Q table
def initialize_q(n_bins_position, n_bins_velocity, n_actions):
    n_actions = env.action_space.n
    n_states = (n_bins_position, n_bins_velocity)
    Q = np.zeros(n_states + (n_actions,))
    return Q

In [84]:
# define all the agents
class BaseAgent(ABC):
    def __init__(self, action_dim:int, epsilon: float, gamma: float = 1.0,n_bins_position:int=20,n_bins_velocity:int=20):
        self.action_dim = action_dim
        self.epsilon = self.initial_epsilon = epsilon
        self.gamma = gamma
        self.n_bins_position = n_bins_position
        self.n_bins_velocity = n_bins_velocity
        self.Q = np.zeros((self.n_bins_position, self.n_bins_velocity, self.action_dim))  
        

    @abstractmethod
    def select_action(self, state) -> int:
        """Select an action given the state"""
        pass

    @abstractmethod
    def learn(self, *args, **kwargs) -> None:
        """Update Q-values based on learning method"""
        pass
    

    def select_action(self, state) -> int:
        if np.random.uniform() < self.epsilon:
            return np.random.choice(self.action_dim)
        else:
            return self.greedy_action(state)

    def greedy_action(self, state) -> int:
        action_values = self.Q[state[0], state[1], :]
        return np.argmax(action_values)  # Choose the best action

    def reset(self) -> None:
        self.Q = np.zeros((31, 100, self.action_dim))  # Reset Q-table
        self.epsilon = self.initial_epsilon

    def __getstate__(self):
        return self.__dict__.copy()

    def __setstate__(self, state):
        self.__dict__.update(state)


class TD0Agent(BaseAgent):  
    def __init__(self, action_dim, epsilon, alpha, gamma=1, offpolicy=False, n_bins_position=20, n_bins_velocity=20):
        super().__init__(action_dim, epsilon, gamma, n_bins_position, n_bins_velocity)
        self.offpolicy = offpolicy
        self.alpha = self.initial_alpha = alpha

    def reset(self):
        super().reset()
        self.alpha = self.initial_alpha

        
    def learn(self, experience:Experience)->None:
        state, action, reward, next_state, done = experience
        if self.offpolicy:
            next_action = self.greedy_action(next_state) # q learning
        else:
            next_action = self.select_action(next_state) # sarsa

        next_max = self.Q[next_state[0],next_state[1], next_action] 
    
        self.Q[state][action] = self.Q[state][action] + self.alpha*(reward+  self.gamma*next_max - self.Q[state][action] )


class NstepSarsaAgent(BaseAgent):
    def __init__(self, action_dim, epsilon, alpha, gamma=1, n=3, n_bins_position=20, n_bins_velocity=20):
        super().__init__(action_dim, epsilon, gamma, n_bins_position, n_bins_velocity)
        self.alpha = self.initial_alpha = alpha
        self.n = n  # Number of steps
        self.memory = deque(maxlen=n+1)  # Store (state, action, reward)

    def reset(self):
        super().reset()
        self.alpha = self.initial_alpha
        self.memory.clear()

    def learn(self, experience:Experience):
        """
        Stores the current experience and updates Q-values when enough steps are collected.
        """
        _, _, _, _, done = experience
        self.memory.append(experience)

       

        if len(self.memory) < (self.n+1) and not done:  # if not reach terminal state: wait until enough steps are collected'
            return

        experiences = self.memory.copy()
        while len(experiences) > 0:
            G = 0 # Return
            for i, (_, _, r, _,_) in enumerate(experiences):
                G += (self.gamma ** (i+1)) * r  # Compute n-step return
           
            last_experience = experiences[-1]  # Last step
            next_state = last_experience.next_state
            next_action = self.select_action(next_state)
            Q_n = self.Q[next_state[0], next_state[1], next_action]
            G += (self.gamma ** self.n) * Q_n
           
            state, action, _, _, _ = experiences[0]
            td_error = G - self.Q[state[0], state[1], action] # Temporal difference error r_1*gamma^0 + r_2*gamma^1 + ... + r_n*gamma^(n-1) + Q(S_n, A_n) - Q(S_0, A_0)
            self.Q[state[0], state[1], action] += self.alpha * td_error
            experiences.popleft()
        
        if len(self.memory) >= self.n + 1:
            self.memory.popleft()
        if done:
            self.memory.clear()          

In [91]:
# Define the training loop
def train_agent(agent, env, total_episodes=10000, decay_fn=None):
    """
    Trains an agent (either MC or TD0) in MountainCar-v0.
    """
    time_outs = 0
    env.action_space.seed(42)

    with tqdm(total=total_episodes, desc="Training Progress") as pbar:
        for episode in range(total_episodes):
            state, _ = env.reset()
            state = discretize_state(state, agent)  # Ensure it's discrete

            done = False
            step = 0
            running_reward = 0

            while not done:
                step += 1
                action = agent.select_action(state)

                # Take action
                next_state, reward, terminated, truncated, _ = env.step(action)
                next_state = discretize_state(next_state,agent)
                done = terminated or truncated

                running_reward += reward
                experience = Experience(state, action, reward, next_state, done)
                agent.learn(experience)

                state = next_state

            # Apply decay to epsilon & alpha
            if decay_fn:
                agent.epsilon = decay_fn(episode, agent.initial_epsilon, total_episodes, A=0.5, B=0.2, C=0.001)
                if isinstance(agent, (TD0Agent, NstepSarsaAgent)):
                    agent.alpha = decay_fn(episode, agent.initial_alpha, total_episodes, A=0.5, B=0.2, C=0.00001)

            # Logging
            pbar.set_postfix({"steps": step, "epsilon": agent.epsilon, "Running Return": running_reward})
            pbar.update(1)



def get_trajectory(agent:BaseAgent,env:gym.Env, filename:str):
    print("Creating video...")
    video_folder = "videos"
    if not os.path.exists(video_folder):
        os.makedirs(video_folder)
    trajectory = []
    env = gym.wrappers.RecordVideo(env, video_folder=video_folder, name_prefix=filename)
    observation, info = env.reset()
    observation = discretize_state(observation,agent)


    while True:
        env.render()
        state = discretize_state(observation,agent)
        trajectory.append(state)
        action = agent.greedy_action(state)

        observation, reward, terminated, truncated, info = env.step(action)
        observation = discretize_state(observation,agent)
        if terminated or truncated:
            env.render()
            break
    
    state = discretize_state(observation,agent)
    trajectory.append(state)
    env.close()
    print("Video saved.")
    video_files = sorted(
        [f for f in os.listdir(video_folder) if f.endswith(".mp4") and f.startswith(filename)],
        key=lambda f: os.path.getmtime(os.path.join(video_folder, f)),  # Sort by creation time
        reverse=True
    )
    video_path = os.path.join(video_folder, video_files[0])
    IPython.display.display(IPython.display.Video(video_path))
    return trajectory



def visualize_policy(agent, env, trajectory=None):
    """
    Visualizes the learned policy of an agent in a given environment.

    Parameters:
    - agent: The trained RL agent with Q-values and greedy policy.
    - env: The environment object, which should contain a MAP attribute and action mapping.
    - trajectory: A list of visited (state_x, state_y) pairs, showing an example path (optional).
    """
    fig, ax = plt.subplots(figsize=(20, 16), dpi=300)

    # Display environment map
    ax.imshow(env.unwrapped.MAP, cmap="gray")

    # Draw finish line (goal area)
    plt.plot(np.full(fill_value=99, shape=31), np.arange(31), color="#32CD32", linewidth=10)

    # Print max Q-value for a sample state
    print(f"Max Q-value at (15,0): {np.max(agent.Q[15, 0, :])}")

    # If a trajectory is provided, plot the agent's path
    if trajectory:
        path_x = [x for _, x in trajectory]
        path_y = [y for y, _ in trajectory]
        ax.plot(path_x, path_y, color="blue", linewidth=2)

    # Draw policy arrows (greedy actions)
    for i in range(31):
        for j in range(99):
            action = agent.greedy_action((i, j))  # Get best action
            dy, dx = env.unwrapped.action_to_direction[action]  # Convert action to movement direction
            ax.arrow(j - dx * 0.25, i - dy * 0.25, dx * 0.25, dy * 0.25, 
                     head_width=0.25, head_length=0.25, color="red")

    plt.title("Learned Policy Visualization")
    plt.show()


In [86]:
def decay(time:int, start_value:float, n_episodes:int,A:float=0.8,B:float=0.3,C:float=0.2, lower_bound:float = 0.01):
        standardized_time=(time-A*n_episodes)/(B*n_episodes)
        csh = np.cosh(np.exp(-standardized_time))
        epsilon=1.1-(1/csh+(time*C/n_episodes))
        return max(epsilon*start_value, lower_bound)

## 20 bins for position and velocity

In [89]:
# Create Q-learning agent (off-policy)
agent_q = TD0Agent(
    action_dim=3,          
    epsilon=1.0,           
    alpha=0.1,             
    gamma=0.99,            
    offpolicy=True,        
    n_bins_position=20,    
    n_bins_velocity=20     
)

# Train the Q-learning agent
train_agent(agent_q, env, total_episodes=5000)


Training Progress: 100%|██████████| 5000/5000 [02:44<00:00, 30.35it/s, steps=1000, epsilon=1, Running Return=-1e+3]


In [92]:
trajectory_agent_q = get_trajectory(agent_q, env, "agent_q")

Creating video...


  logger.warn(


Video saved.


In [94]:
# Create SARSA agent (on-policy)
agent_sarsa = TD0Agent(
    action_dim=3,          
    epsilon=1.0,           
    alpha=0.1,             
    gamma=0.99,            
    offpolicy=False,       
    n_bins_position=30,    
    n_bins_velocity=30     
)

# Train the SARSA agent
train_agent(agent_sarsa, env, total_episodes=5000)


Training Progress: 100%|██████████| 5000/5000 [03:15<00:00, 25.61it/s, steps=1000, epsilon=1, Running Return=-1e+3]


In [95]:
trajectory_sarsa = get_trajectory(agent_sarsa, env, "agent_sarsa")

  logger.warn(


Creating video...
Video saved.


In [None]:
# Create N-Step SARSA agent
agent_nstep_sarsa = NstepSarsaAgent(
    action_dim=3,          
    epsilon=1.0,           
    alpha=0.1,             
    gamma=0.99,            
    n=3,                   
    n_bins_position=20,    
    n_bins_velocity=20     
)

# Train the N-step SARSA agent
train_agent(agent_nstep_sarsa, env, total_episodes=10_000)


Training Progress:  18%|█▊        | 1811/10000 [02:04<09:03, 15.06it/s, steps=1000, epsilon=1, Running Return=-1e+3]

In [None]:
trajectory_nstep_sarsa = get_trajectory(agent_nstep_sarsa, env, "agent_nstep_sarsa")

## 30 bins for position and 30 bins for velocity

In [None]:
# Create Q-learning agent (off-policy)
agent_q2 = TD0Agent(
    action_dim=3,          
    epsilon=1.0,           
    alpha=0.1,             
    gamma=0.99,            
    offpolicy=True,        
    n_bins_position=30,    
    n_bins_velocity=30     
)

# Train the Q-learning agent
train_agent(agent_q2, env, total_episodes=5000)


In [None]:
trajectory_agent_q2 = get_trajectory(agent_q2, env, "agent_q2")

In [None]:
# Create SARSA agent (on-policy)
agent_sarsa2 = TD0Agent(
    action_dim=3,          
    epsilon=1.0,           
    alpha=0.1,             
    gamma=0.99,            
    offpolicy=False,       
    n_bins_position=30,    
    n_bins_velocity=30     
)

# Train the SARSA agent
train_agent(agent_sarsa, env, total_episodes=5000)


In [None]:
trajectory_sarsa2 = get_trajectory(agent_sarsa2, env, "agent_sarsa2")

In [None]:
# Create N-Step SARSA agent
agent_nstep_sarsa2 = NstepSarsaAgent(
    action_dim=3,          
    epsilon=1.0,           
    alpha=0.1,             
    gamma=0.99,            
    n=3,                   
    n_bins_position=30,    
    n_bins_velocity=30     
)

# Train the N-step SARSA agent
train_agent(agent_nstep_sarsa, env, total_episodes=10_000)


In [None]:
trajectory_nstep_sarsa2 = get_trajectory(agent_nstep_sarsa2, env, "agent_nstep_sarsa2")