# Mountain Car Miniproject Tutorial Notebook

This notebook is here to guide you through the basics of the frameworks necessary for you to do well on your CS456-Miniproject 🤓

In [1]:
import gymnasium as gym

## Gymnasium environments

One of the main and most spread environment developer in the field of RL research is [Gymnasium](https://gymnasium.farama.org/). They provide standardized environments offering a large range of difficulties and setups, that are perfectly designed to benchmark performances of RL and Deep RL algorithms.

The main structure is very simple to understand. First, we need to instantiate our environment. We will use an existing environment, but one could also use their structure to design their own environment.

Let's directly work with the Mountain Car environment that will be used in the project. 

_PS: If you're more curious, feel free to browse the large list available on their website!_

In [2]:
env = gym.make('MountainCar-v0')

The environment contains an action space and an observation (state) space. Let's see what these look like.

In [3]:
print(f"Action space: {env.action_space}")
print(f"Observation space: {env.observation_space}")

Action space: Discrete(3)
Observation space: Box([-1.2  -0.07], [0.6  0.07], (2,), float32)


In [4]:
print(f"Number of actions available: {env.action_space.n}")
print(f"Observation shape: {env.observation_space.shape}")

Number of actions available: 3
Observation shape: (2,)


As we can see, the action space of that first environment is discrete and contains 3 possible actions: accelerate to the left, don't accelerate and accelerate to the right. 

The observation space has a dimension of 2, and you can find what each part represents [here](https://gymnasium.farama.org/environments/classic_control/mountain_car/#observation-space).

Before taking actions, the environment should be reset (or boostrapped). **Note: this should be done every time the environment has to be restarted, i.e., at the end of any episode.**

In [5]:
# the second return value is an info dictionary, but it doesn't contain anything in this environment
starting_state, _ = env.reset() 

print(f"Starting state: {starting_state}")

Starting state: [-0.53400785  0.        ]


Now that we know what the actions look like and that the environment is ready, we can take actions inside it. This is done using the `env.step` function, that takes an action as input, and returns multiple values. More details on each of them can be found [here](https://gymnasium.farama.org/api/env/#gymnasium.Env.step).

In the project, you will have an agent that will choose an action (based on the policy learned) given the current state. However, for now, we can simply sample actions at random using `action_space.sample()`.

In [6]:
action = env.action_space.sample()
print(f"Sampled action: {action}")
next_state, reward, terminated, truncated, _ = env.step(action) # again, the last return value is an empty info object

print(f"Next state: {next_state}")
print(f"Reward: {reward}")
print(f"Terminated: {terminated}")
print(f"Truncated: {truncated}")

Sampled action: 2
Next state: [-0.5329298   0.00107806]
Reward: -1.0
Terminated: False
Truncated: False


The `terminated` and `truncated`  variables represent the two ways that the episode might be done. Thus, it might be handy to use
```
done = terminated or truncated
```
in your code. 💡

We now have all the pieces necessary to run a full episode!

In [7]:
done = False
state, _ = env.reset()
episode_reward = 0

while not done:
    action = env.action_space.sample()
    next_state, reward, terimnated, truncated, _ = env.step(action)

    episode_reward += reward

    state = next_state
    done = terminated or truncated

print(f"Episode reward after taking random actions: {episode_reward}")

Episode reward after taking random actions: -200.0


Now your goal in the project will be to code an agent that can beat that 🙃

In [27]:
import numpy as np
import gymnasium as gym
from collections import defaultdict

class DynaAgent:
    def __init__(self, env, discr_step=(0.025, 0.005), gamma=0.99, alpha=0.1, epsilon=1.0, epsilon_min=0.05, epsilon_decay=0.995, k=10):
        self.env = env
        self.gamma = gamma  # 折扣因子
        self.alpha = alpha  # 学习率
        self.epsilon = epsilon  # 初始探索率
        self.epsilon_min = epsilon_min  # 最小探索率
        self.epsilon_decay = epsilon_decay  # 探索率衰减
        self.k = k  # 每次更新Q值时进行的模拟更新次数
        
        self.discr_step = discr_step  # 离散化步长
        # 计算离散状态空间的大小
        self.num_states = (int((env.observation_space.high[0] - env.observation_space.low[0]) / discr_step[0]) + 1,
                           int((env.observation_space.high[1] - env.observation_space.low[1]) / discr_step[1]) + 1)
        self.num_actions = env.action_space.n  # 动作空间的大小
        
        # 初始化Q表格，以及状态-动作计数和状态-动作-下一状态计数
        self.Q = np.zeros((*self.num_states, self.num_actions))  # Q值表格
        self.state_action_counts = np.zeros((*self.num_states, self.num_actions))  # 状态-动作计数
        self.state_action_next_state_counts = np.zeros((*self.num_states, self.num_actions, *self.num_states))  # 状态-动作-下一状态计数
        self.R = np.zeros((*self.num_states, self.num_actions))  # 奖励表格

        # 初始化转移概率矩阵 P
        self.P = np.full((*self.num_states, self.num_actions, *self.num_states), 1.0 / np.prod(self.num_states))  # 初始化为均匀分布
        
        # 打印状态空间和动作空间的信息，用于调试
        print(f"State space size: {self.num_states}")
        print(f"State space low: {env.observation_space.low}, high: {env.observation_space.high}")
        print(f"Number of actions: {self.num_actions}")

    def discretize(self, state):
        # 将连续状态空间离散化
        low = self.env.observation_space.low
        high = self.env.observation_space.high
        # 分别计算两个维度的离散状态
        discr_state = (
            int((state[0] - low[0]) / self.discr_step[0]),
            int((state[1] - low[1]) / self.discr_step[1])
        )
        # 使用np.clip确保离散状态在有效范围内
        discr_state = np.clip(discr_state, (0, 0), (self.num_states[0] - 1, self.num_states[1] - 1))
        return discr_state

    def observe(self, state, action, next_state, reward):
        # 将当前状态和下一状态离散化
        discr_state = self.discretize(state)
        discr_next_state = self.discretize(next_state)
        
        # 更新状态-动作计数和状态-动作-下一状态计数
        self.state_action_counts[discr_state[0], discr_state[1], action] += 1
        self.state_action_next_state_counts[discr_state[0], discr_state[1], action, discr_next_state[0], discr_next_state[1]] += 1
        self.R[discr_state[0], discr_state[1], action] = ((self.R[discr_state[0], discr_state[1], action] * (self.state_action_counts[discr_state[0], discr_state[1], action] - 1)) + reward) / self.state_action_counts[discr_state[0], discr_state[1], action]

        # 更新转移概率矩阵 P
        total_next_state_counts = np.sum(self.state_action_next_state_counts[discr_state[0], discr_state[1], action])
        if total_next_state_counts > 0:
            self.P[discr_state[0], discr_state[1], action] = self.state_action_next_state_counts[discr_state[0], discr_state[1], action] / total_next_state_counts

    def select_action(self, state):
        # 根据当前策略选择动作
        discr_state = self.discretize(state)
        if np.random.rand() < self.epsilon:
            # 以ε的概率选择随机动作（探索）
            return self.env.action_space.sample()
        else:
            # 以1-ε的概率选择Q值最大的动作（利用）
            return np.argmax(self.Q[discr_state[0], discr_state[1]])
        
    def update(self, state, action, reward, next_state):
        # 使用实际转移更新Q值
        discr_state = self.discretize(state)
        discr_next_state = self.discretize(next_state)
        
        # 更新 Q 值
        self.Q[discr_state[0], discr_state[1], action] += self.alpha * (
            reward + self.gamma * np.max(self.Q[discr_next_state[0], discr_next_state[1]]) - 
            self.Q[discr_state[0], discr_state[1], action])

        # 进行 k 次模拟更新
        for _ in range(self.k):
            rand_state = (np.random.randint(0, self.num_states[0]), np.random.randint(0, self.num_states[1]))
            rand_action = np.random.randint(0, self.num_actions)
            if self.state_action_counts[rand_state[0], rand_state[1], rand_action] == 0:
                continue
            next_state_prob = self.P[rand_state[0], rand_state[1], rand_action]
            next_state_index = np.argmax(next_state_prob)
            next_state = np.unravel_index(next_state_index, self.num_states)
            reward = self.R[rand_state[0], rand_state[1], rand_action]
            self.Q[rand_state[0], rand_state[1], rand_action] += self.alpha * (
                reward + self.gamma * np.max(self.Q[next_state[0], next_state[1]]) - 
                self.Q[rand_state[0], rand_state[1], rand_action])
        
        # 衰减ε，逐渐减少探索率
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

In [28]:
# 创建Mountain Car环境
env = gym.make('MountainCar-v0')
# 实例化Dyna代理
agent = DynaAgent(env)

num_episodes = 3000  # 训练的episode数量
rewards = []

for episode in range(num_episodes):
    state, _ = env.reset()  # 重置环境
    total_reward = 0
    
    done = False
    while not done:
        action = agent.select_action(state)  # 选择动作
        next_state, reward, done, _, _ = env.step(action)  # 执行动作，获取下一状态和奖励
        
        agent.observe(state, action, next_state, reward)  # 记录观察到的转移
        agent.update(state, action, reward, next_state)  # 更新Q值
        
        state = next_state
        total_reward += reward
    
    rewards.append(total_reward)
    if (episode + 1) % 100 == 0:
        print(f"Episode: {episode + 1}, Total Reward: {total_reward}, Epsilon: {agent.epsilon}")



State space size: (73, 29)
State space low: [-1.2  -0.07], high: [0.6  0.07]
Number of actions: 3
Episode: 100, Total Reward: -458.0, Epsilon: 0.05
Episode: 200, Total Reward: -407.0, Epsilon: 0.05
Episode: 300, Total Reward: -347.0, Epsilon: 0.05
Episode: 400, Total Reward: -382.0, Epsilon: 0.05
Episode: 500, Total Reward: -320.0, Epsilon: 0.05
Episode: 600, Total Reward: -253.0, Epsilon: 0.05
Episode: 700, Total Reward: -340.0, Epsilon: 0.05
Episode: 800, Total Reward: -260.0, Epsilon: 0.05
Episode: 900, Total Reward: -206.0, Epsilon: 0.05
Episode: 1000, Total Reward: -319.0, Epsilon: 0.05
Episode: 1100, Total Reward: -233.0, Epsilon: 0.05
Episode: 1200, Total Reward: -229.0, Epsilon: 0.05
Episode: 1300, Total Reward: -190.0, Epsilon: 0.05
Episode: 1400, Total Reward: -239.0, Epsilon: 0.05
Episode: 1500, Total Reward: -167.0, Epsilon: 0.05
Episode: 1600, Total Reward: -174.0, Epsilon: 0.05
Episode: 1700, Total Reward: -172.0, Epsilon: 0.05
Episode: 1800, Total Reward: -161.0, Epsilon

In [None]:
# 绘制累计奖励
import matplotlib.pyplot as plt
plt.plot(rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Dyna Agent Training')
plt.show()