In [13]:
import collections
import gymnasium as gym
import itertools
import numpy as np

# from numpy.typing import NDArray
import pandas as pd
from pathlib import Path
import random

from typing import Callable, cast, List, Tuple, Union

In [14]:
import matplotlib.pyplot as plt

import seaborn as sns
from tqdm.notebook import tqdm

In [15]:
from IPython.display import Video
from ipywidgets import interact

In [16]:
import warnings

warnings.filterwarnings("ignore", category=UserWarning)

In [17]:
sns.set_context("talk")

In [30]:
FIGS_DIR = Path("figs/") / "project"       # Where to save figures (.gif or .mp4 files)
PLOTS_DIR = Path("figs/") / "project"      # Where to save plots (.png or .svg files)
MODELS_DIR = Path("models/") / "project"   # Where to save models (.pth files)

In [31]:
if not FIGS_DIR.exists():
    FIGS_DIR.mkdir(parents=True)
if not PLOTS_DIR.exists():
    PLOTS_DIR.mkdir(parents=True)
if not MODELS_DIR.exists():
    MODELS_DIR.mkdir(parents=True)

In [20]:
def video_selector(file_path: List[Path]):
    return Video(file_path, embed=True, html_attributes="controls autoplay loop")

## Setup the Lunar Lander problem with Gymnasium

For the purpose of focusing on the algorithms, we will use standard environments provided
by the Gymnasium framework.
As a reminder, this environment is described [here](https://gymnasium.org.cn/environments/box2d/lunar_lander/).

The action indices are outlined below:

| Action Index | Action     |
|--------------|------------|
| 0            | nothing  |
| 1            |  left orientation engine  |
| 2            |  main engine |
| 3            |  right orientation engine |


In [21]:
"""
env = gym.make("LunarLander-v3", 
               continuous=True,        #chose between discrete and continuous action space
               gravity=-10.0,           #default is -10
               enable_wind=False,
               wind_power=15.0,         # only used if enable_wind=True
               turbulence_power=1.5,   # wind power changes rate
               render_mode =
               "rgb_array")
"""

'\nenv = gym.make("LunarLander-v3", \n               continuous=True,        #chose between discrete and continuous action space\n               gravity=-10.0,           #default is -10\n               enable_wind=False,\n               wind_power=15.0,         # only used if enable_wind=True\n               turbulence_power=1.5,   # wind power changes rate\n               render_mode =\n               "rgb_array")\n'

In [22]:
env = gym.make("LunarLander-v3", continuous=False)

In [23]:
LL_observation_dim = env.observation_space._shape[0]
action_labels = {0: "Nothing", 1: "Left Engine", 2: "Main Engine", 3: " Right Engine"}

In [24]:
"""
VIDEO_PREFIX_INITIALIZATION = "discrete_init"

(FIGS_DIR / f"{VIDEO_PREFIX_INITIALIZATION}-episode-0.mp4").unlink(missing_ok=True)

env = gym.wrappers.RecordVideo(env, video_folder=str(FIGS_DIR), name_prefix=VIDEO_PREFIX_INITIALIZATION)
# 随机动作 + episode 循环
observation, info = env.reset()
end = False
while not end:
    action = env.action_space.sample()# 随机动作
    observation, reward, terminated, truncated, info = env.step(action)
    end = terminated or truncated

env.close()
"""

'\nVIDEO_PREFIX_INITIALIZATION = "discrete_init"\n\n(FIGS_DIR / f"{VIDEO_PREFIX_INITIALIZATION}-episode-0.mp4").unlink(missing_ok=True)\n\nenv = gym.wrappers.RecordVideo(env, video_folder=str(FIGS_DIR), name_prefix=VIDEO_PREFIX_INITIALIZATION)\n# 随机动作 + episode 循环\nobservation, info = env.reset()\nend = False\nwhile not end:\n    action = env.action_space.sample()# 随机动作\n    observation, reward, terminated, truncated, info = env.step(action)\n    end = terminated or truncated\n\nenv.close()\n'

In [25]:
"""
Video(
    FIGS_DIR / f"{VIDEO_PREFIX_INITIALIZATION}-episode-0.mp4",
    embed=True,
    html_attributes="controls autoplay loop",
)
"""

'\nVideo(\n    FIGS_DIR / f"{VIDEO_PREFIX_INITIALIZATION}-episode-0.mp4",\n    embed=True,\n    html_attributes="controls autoplay loop",\n)\n'

## Discrete Version

### epsilon_greedy_policy (same as Lab5)

In [26]:
def greedy_policy(state: int, q_array: np.ndarray) -> int:
    """
    Determine the action that maximizes the Q-value for a given state.

    Parameters
    ----------
    state : int
        The current state.
    q_array : np.ndarray
        The Q-table.

    Returns
    -------
    int
        The action that maximizes the Q-value for the given state.
    """
    action = np.argmax(q_array[state])
    return action


def epsilon_greedy_policy(state: int, q_array: np.ndarray, epsilon: float) -> int:
    """
    Determine the action to take based on an epsilon-greedy policy.

    Parameters
    ----------
    state : int
        The current state.
    q_array : np.ndarray
        The Q-table.
    epsilon : float
        The probability of choosing a random action.

    Returns
    -------
    int
        The action to take.
    """
    if np.random.rand() < epsilon:
        # epsilon % to choose a random action
        num_actions = q_array.shape[1]  #  q_array: shape=(n_states, n_actions)
        action = np.random.randint(num_actions)
    else:
        # (1 - epsilon) % use greedy policy
        action = np.argmax(q_array[state])
    return action

## Q learning

### Gym 社区关于 LunarLander 的典型范围

#### 位置范围
- **横向位置** `x ∈ [-1.0,1.0]`
- **纵向位置** `y ∈ [0,1.4]` 或者更宽松一点 `y ∈ [-1,1.4]`
#### 速度范围
- **横向速度** `ẋ ∈ [-2,2]`
- **纵向速度** `ẏ ∈ [-2,2]`
#### 角度与角速度
- **角度** `θ ∈ [-π, π]` （可能不会真的到 `±π` 这么极端，但以此作为可参考的界限）
- **角速度** `θ̇ ∈ [-2,2]` 或略大
#### 着地状态
- **两条腿的接触状态** 是二进制 `{0,1}`
#### 具体状态变量
- **state[0]**: 横向位置（归一化） `[-1,+1]`
- **state[1]**: 纵向位置（归一化） `[-1,+1]`
- **state[2]**: 横向速度（缩放后） `[-2,2]`
- **state[3]**: 纵向速度（缩放后） `[-2,2]`
- **state[4]**: 飞船角度 `θ` `[−π, π]` 或 `[−2π, +2π]`
- **state[5]**: 飞船角速度（缩放后） `[-2,2]`
- **state[6]**: 左腿是否着地 `{0,1}`
- **state[7]**: 右腿是否着地 `{0,1}`


In [27]:

state_limits = [
    (-1.0, 1.0),   # 维度0
    (-1.0, 1.0),   # 维度1
    (-2.0, 2.0),   # 维度2
    (-2.0, 2.0),   # 维度3
    (-np.pi, np.pi),  # 维度4
    (-2.0, 2.0),   # 维度5 
    (0, 1),        # 维度6 
    (0, 1)         # 维度7 
]

num_bins = [8, 8, 8, 8, 8, 8, 2, 2]  # 每一维切分多少段，仅作举例
bins_list = []

for (low, high), nb in zip(state_limits, num_bins):
    # 生成 nb-1 个切分点，相当于把区间切成 nb 段
    # np.linspace 会返回 nb+1 个点, 我们一般取这些点的中间  nb-1 个 也可以
    edges = np.linspace(low, high, nb+1)[1:-1]  # 这里就保留 (nb - 1) 个分割点
    bins_list.append(edges)

def discretize_state(state):
    """
    将 8 维连续状态离散化为一个 tuple 索引.
    state: np.ndarray, shape=(8,)
    bins_list: list of 8 arrays (each is the bin edges).
    """
    indices = []
    for i in range(len(state)):
        # digitize 根据 bins 把 x 映射到区间索引
        idx = np.digitize(state[i], bins_list[i])
        indices.append(idx)
    return tuple(indices)


In [28]:
import collections

# 动作空间大小
n_actions = env.action_space.n

# 字典 Q-table
Q_dict = collections.defaultdict(lambda: np.zeros(n_actions))

alpha = 0.01 # 学习率 0.1-0.01
gamma = 0.99
epsilon = 1.0
epsilon_decay = 0.99
epsilon_min = 0.01

num_episodes = 40000
max_steps = 500

# 8维 state 的 min/max 测定
mins = np.full(8, +np.inf, dtype=np.float32)
maxs = np.full(8, -np.inf, dtype=np.float32)

for ep in range(num_episodes):
    obs, _ = env.reset()
    # 将连续状态离散化
    state_disc = discretize_state(obs)
    done = False
    
    for step in range(max_steps):
        # obs 是一个 shape=(8,) 的 array
        # 更新 min/max
        mins = np.minimum(mins, obs)
        maxs = np.maximum(maxs, obs)
        # epsilon-greedy
        if np.random.rand() < epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(Q_dict[state_disc])

        next_obs, reward, terminated, truncated, info = env.step(int(action))
        done = terminated or truncated

        # 离散化下一状态
        next_state_disc = discretize_state(next_obs)

        # Q-learning 更新
        best_next_action = np.argmax(Q_dict[next_state_disc])
        Q_dict[state_disc][action] += alpha * (
            reward + gamma * Q_dict[next_state_disc][best_next_action] 
            - Q_dict[state_disc][action]
        )

        state_disc = next_state_disc
        if done:
            break

    # 衰减 epsilon
    epsilon = max(epsilon_min, epsilon * epsilon_decay)
# 在训练结束后，看一下全程实际观测到的 min, max
print("Observed dimension-wise min:", mins)
print("Observed dimension-wise max:", maxs)
env.close()
print("done training！")



Observed dimension-wise min: [-0.00798225  1.398221   -0.80853826 -0.5644135  -0.0092428  -0.18314047
  0.          0.        ]
Observed dimension-wise max: [0.00798235 1.4224188  0.80851346 0.5110565  0.00925628 0.18314597
 0.         0.        ]
done training！


训练分数较低、经常负分，说明落月失败或中途坠毁占大多数。
在像 LunarLander 这样 8 维连续空间里，表格 Q-learning + 离散化往往难度大，需要非常多的训练或非常精心的分箱+超参数调参才能成功。

In [32]:
from IPython.display import Video


# 1. 重新创建一个环境，用于测试和录制视频
test_env = gym.make("LunarLander-v3", continuous=False, render_mode="rgb_array")

# 2. 用 RecordVideo 包装环境，指定视频存储路径
VIDEO_PREFIX = "discrete_test"
(test_env) = gym.wrappers.RecordVideo(
    test_env,
    video_folder=str(FIGS_DIR),       # 录制保存的文件夹
    name_prefix=VIDEO_PREFIX,         # 录制文件名前缀
    episode_trigger=lambda episode_id: True,  # 每个 episode 都保存视频
)

# 3. 运行若干个 episode，让智能体用训练好的 Q_dict 策略来选动作
n_test_episodes = 5

for ep in range(n_test_episodes):
    obs, _ = test_env.reset()
    state_disc = discretize_state(obs)      # 将连续状态离散化
    done = False
    total_reward = 0

    while not done:
        # 选取离散化状态下 Q 值最大的动作
        action = np.argmax(Q_dict[state_disc])

        next_obs, reward, terminated, truncated, info = test_env.step(action)
        total_reward += reward

        done = terminated or truncated
        state_disc = discretize_state(next_obs)
    
    print(f"Episode {ep} finished with total_reward={total_reward:.2f}")

test_env.close()
print("5 test episode done and recorded!")

# 4. 在 notebook 中展示刚录制的第一个视频
#   由于 RecordVideo 默认生成类似 "discrete_test-episode-0.mp4"、"discrete_test-episode-1.mp4" 等。
#   这里演示查看第0号 episode。
video_path = FIGS_DIR / f"{VIDEO_PREFIX}-episode-0.mp4"

Video(
    video_path,
    embed=True,
    html_attributes="controls autoplay loop",
)


Episode 0 finished with total_reward=-116.71
Episode 1 finished with total_reward=-179.23
Episode 2 finished with total_reward=-110.09
Episode 3 finished with total_reward=-116.06
Episode 4 finished with total_reward=-111.95
5 test episode done and recorded!
