In [1]:
from tqdm import tqdm
import numpy as np
from pathlib import Path
import shutil
import imageio
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import gymnasium as gym

camera = dict(
    up=dict(x=0, y=0, z=1),
    center=dict(x=0, y=0, z=0),
    eye=dict(x=0.25, y=-2.0, z=2.0)
)
env = gym.make('MountainCar-v0')
obs_space: gym.spaces.Box = env.observation_space

discrete_positions = np.linspace(obs_space.low[0], obs_space.high[0], num=40)
discrete_vels = np.linspace(obs_space.low[1], obs_space.high[1], num=40)
actions = [0, 1, 2]

def draw_v_function_and_policy(title: str, q_array: np.ndarray) -> go.Figure:
    fig = make_subplots(
        rows=1, cols=2,
        shared_xaxes=False,
        specs=[[{'type': 'surface'}, {'type': 'surface'}]],
        subplot_titles=("Value function", "Policy"))
    
    v_data = q_array.max(axis=2)
    policy_data = q_array.argmax(axis=2)

    fig.add_trace(go.Surface(x=discrete_positions, y=discrete_vels, z=v_data, colorscale='YlGnBu'), col=1, row=1)
    fig.add_trace(go.Surface(x=discrete_positions, y=discrete_vels, z=policy_data, colorscale='YlGnBu'), col=2, row=1)
    fig.layout.scene1.camera=camera
    fig.layout.scene2.camera=camera
    fig.update_layout(scene_camera=camera, title=title,
                      margin=dict(r=25, l=25, b=10, t=80),
                      width=750,
                      showlegend=False)
    fig.update_scenes(xaxis_title_text='position ',  
                      yaxis_title_text='velocity')
    return fig

In [2]:
import math
import numpy as np
from collections import deque
import gymnasium as gym
from tqdm import tqdm

env = gym.make('MountainCar-v0')
obs_space: gym.spaces.Box = env.observation_space

Q = np.zeros((discrete_positions.shape[0], discrete_vels.shape[0], 3), dtype=float)
Q_size = np.array(list(Q.shape[:2]), dtype=float)

n_episodes = 200000
eps_from = 0.6
eps_to = 1e-2
lr = 2e-2
discount_rate = 0.99

temp_dir = Path('tmp')
if temp_dir.exists():
    shutil.rmtree(temp_dir)
temp_dir.mkdir()
images = []

last_rewards = deque(maxlen=100)
best_reward = -1e10
best_Q = None

fig = draw_v_function_and_policy('Start', Q)
image_path = temp_dir / 'begin.png'
fig.write_image(image_path)
images.append(imageio.imread(image_path))

ep_reward = 0
progress_bar = tqdm(range(n_episodes))
for ep_idx in progress_bar:
    t_eps_greedy = min(max((ep_idx - 10000) / n_episodes, 0.0), 1.0)
    eps_greedy = eps_from * math.exp(math.log(eps_to / eps_from) * t_eps_greedy)
    # eps_greedy = eps_from + (eps_to - eps_from) * (ep_idx / n_episodes)
    # eps_greedy = 0.5
    
    done = False
    ep_reward = 0
    state, info = env.reset()
    discrete_state = np.round((state - obs_space.low) * (Q_size / (obs_space.high - obs_space.low)))
    discrete_state = discrete_state.astype(int)
    
    while not done:
        if np.random.uniform(0, 1) < eps_greedy:
            action = np.random.choice(actions)
        else:
            action = Q[discrete_state[0], discrete_state[1]].argmax()
            
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        
        next_discrete_state = np.round((next_state - obs_space.low) * (Q_size / (obs_space.high - obs_space.low)))
        next_discrete_state = next_discrete_state.astype(int)
        
        if terminated:
            td_target = reward
        else:
            td_target = reward + Q[next_discrete_state[0], next_discrete_state[1]].max() * discount_rate
        td_error = td_target - Q[discrete_state[0], discrete_state[1], action]
        Q[discrete_state[0], discrete_state[1], action] += td_error * lr

        ep_reward += reward
        
        state = next_state
        discrete_state = next_discrete_state
    
    last_rewards.append(ep_reward)
    
    mean_reward = sum(last_rewards) / len(last_rewards)
    if mean_reward > best_reward:
        best_reward = mean_reward
        best_Q = Q.copy()
    if ep_idx % 500 == 0:
        ep_name = str(ep_idx) if ep_idx > 0 else '1'
        title = f'Episode {ep_name}, eps: {eps_greedy:.2f}, mean reward: {mean_reward:.2f}, '\
            f'best reward: {best_reward:.2f}'
        fig = draw_v_function_and_policy(title, Q)
        image_path = temp_dir / f'episode_{ep_name}.png'
        fig.write_image(image_path)
        images.append(imageio.imread(image_path))
        
    progress_bar.set_description(f'Reward: {mean_reward:.2f}, best reward: {best_reward:.2f}, eps: {eps_greedy:.2f}')

env.close()
imageio.mimsave('MountainCar_sarsa.mp4', images, fps=10)    
    





Reward: -119.32, best reward: -107.98, eps: 0.01: 100%|██████████| 200000/200000 [45:51<00:00, 72.69it/s]  


In [4]:
last_rewards.clear()

for ep_idx in range(100):
    done = False
    ep_reward = 0
    state, info = env.reset()
    discrete_state = np.round((state - obs_space.low) * (Q_size / (obs_space.high - obs_space.low)))
    discrete_state = discrete_state.astype(int)
    
    while not done:
        action = best_Q[discrete_state[0], discrete_state[1]].argmax()
            
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        
        next_discrete_state = np.round((next_state - obs_space.low) * (Q_size / (obs_space.high - obs_space.low)))
        next_discrete_state = next_discrete_state.astype(int)
        
        state = next_state
        discrete_state = next_discrete_state
    
    last_rewards.append(ep_reward)
print(f'Mean reward: {best_reward}')

np.save('best_Q.npy', best_Q)

Mean reward: -107.98


In [2]:
from pathlib import Path
from collections import deque
import shutil
import numpy as np
import gymnasium as gym
import imageio

env = gym.make('MountainCar-v0', render_mode='rgb_array')
obs_space: gym.spaces.Box = env.observation_space

Q: np.ndarray = np.load('best_Q.npy')
Q_size = np.array(list(Q.shape[:2]), dtype=float)

temp_dir = Path('tmp')
if temp_dir.exists():
    shutil.rmtree(temp_dir)
temp_dir.mkdir()
images = []

last_rewards = deque(maxlen=100)
best_reward = -1e10

for _ in range(5):
    done = False
    ep_reward = 0
    state, info = env.reset()
    discrete_state = np.round((state - obs_space.low) * (Q_size / (obs_space.high - obs_space.low)))
    discrete_state = discrete_state.astype(int)
    
    rgb_frame = env.render()
    images.append(rgb_frame)
    while not done:
        action = Q[discrete_state[0], discrete_state[1]].argmax()
            
        next_state, reward, terminated, truncated, info = env.step(action)
        rgb_frame = env.render()
        images.append(rgb_frame)
        
        done = terminated or truncated
        
        next_discrete_state = np.round((next_state - obs_space.low) * (Q_size / (obs_space.high - obs_space.low)))
        next_discrete_state = next_discrete_state.astype(int)
        
        state = next_state
        discrete_state = next_discrete_state
        
        ep_reward += reward
    
    last_rewards.append(ep_reward)
    mean_reward = sum(last_rewards) / len(last_rewards)
    print(f'Mean reward: {mean_reward:.2f}')
    
env.close()
imageio.mimsave('MountainCar_best_policy.mp4', images, fps=30)    
    

Mean reward: -96.00
Mean reward: -102.50
Mean reward: -106.33
Mean reward: -107.75




Mean reward: -108.20
