In [15]:
!pip install gymnasium -q

# **Mountain Car RL**

In [16]:
import gymnasium as gym 
import numpy as np

env = gym.make('MountainCar-v0')

# Discretize 
pos_space = np.linspace(env.observation_space.low[0], env.observation_space.high[0], 20)
vel_space = np.linspace(env.observation_space.low[1], env.observation_space.high[1], 20)

q_table = np.zeros((len(pos_space), len(vel_space), env.action_space.n))

learning_rate = 0.1
discount = 0.95
epochs = 2000
epsilon = 0.5
epsilon_decay = 0.998

def get_discrete_state(state):
    pos, vel = state
    pos_bin = np.digitize(pos, pos_space)
    vel_bin = np.digitize(vel, vel_space)
    return (pos_bin, vel_bin)

# Training loop
for epoch in range(epochs):
    state = get_discrete_state(env.reset()[0]) 
    terminated = False
    truncated = False
    
    if epoch % 100 == 0:
        print(f"Epoch: {epoch}")

    while not terminated and not truncated:
        # Epsilon-greedy action selection
        if np.random.random() < epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(q_table[state])
            
        new_state_continuous, reward, terminated, truncated, _ = env.step(action) 
        new_state = get_discrete_state(new_state_continuous)
        
        done = terminated or truncated 

        if not done:
            max_future_q = np.max(q_table[new_state])
            current_q = q_table[state + (action,)]
            
            new_q = current_q + learning_rate * (reward + discount * max_future_q - current_q)
            q_table[state + (action,)] = new_q
        
        elif terminated:
            q_table[state + (action,)] = 0  

        state = new_state
        
    if epsilon > 0.05:
        epsilon *= epsilon_decay

print("Training finished!")
env.close()

Epoch: 0
Epoch: 100
Epoch: 200
Epoch: 300
Epoch: 400
Epoch: 500
Epoch: 600
Epoch: 700
Epoch: 800
Epoch: 900
Epoch: 1000
Epoch: 1100
Epoch: 1200
Epoch: 1300
Epoch: 1400
Epoch: 1500
Epoch: 1600
Epoch: 1700
Epoch: 1800
Epoch: 1900
Training finished!


In [17]:
import imageio
import numpy as np
import gymnasium as gym

video_env = gym.make('MountainCar-v0', render_mode='rgb_array')

frames = []
state = get_discrete_state(video_env.reset()[0])
done = False

while not done:
    action = np.argmax(q_table[state])
    new_state_continuous, _, terminated, truncated, _ = video_env.step(action)
    state = get_discrete_state(new_state_continuous)
    done = terminated or truncated
    frame = video_env.render()
    frame = np.array(frame)
    if frame.ndim == 4 and frame.shape[0] == 2:
        frame = frame[0]
    frames.append(frame)

video_env.close()

h, w = frames[0].shape[:2]
frames = [f[:h, :w, :3] for f in frames]

# Save video
video_path = "mountaincar_RL.mp4"
imageio.mimsave(video_path, frames, fps=30, macro_block_size=None)
print(f"Video saved as: {video_path}")

Video saved as: mountaincar_RL.mp4


# **Mountain Car DRL**

In [27]:
import gymnasium as gym 
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque

# Q-Network 
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)
    
    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Replay Buffer
class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.memory = deque(maxlen=buffer_size)
        self.batch_size = batch_size
    
    def add(self, state, action, reward, next_state, done): # 'done' here means (terminated or truncated)
        self.memory.append((state, action, reward, next_state, done))
    
    def sample(self):
        experiences = random.sample(self.memory, self.batch_size)
        
        states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float()
        actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long()
        rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float()
        next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float()
        dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float()
        
        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        return len(self.memory)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

BUFFER_SIZE = 100000
BATCH_SIZE = 64
GAMMA = 0.99
LR = 0.0005
UPDATE_EVERY = 4

env = gym.make('MountainCar-v0')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

q_network_local = QNetwork(state_size, action_size).to(device)
q_network_target = QNetwork(state_size, action_size).to(device)
optimizer = optim.Adam(q_network_local.parameters(), lr=LR)
memory = ReplayBuffer(BUFFER_SIZE, BATCH_SIZE)

q_network_target.load_state_dict(q_network_local.state_dict())

def learn(experiences, gamma):
    states, actions, rewards, next_states, dones = experiences

    states = states.to(device)
    actions = actions.to(device)
    rewards = rewards.to(device)
    next_states = next_states.to(device)
    dones = dones.to(device)

    best_actions = q_network_local(next_states).detach().argmax(1).unsqueeze(1)
    q_targets_next = q_network_target(next_states).detach().gather(1, best_actions)
    q_targets = rewards + (gamma * q_targets_next * (1 - dones))
    q_expected = q_network_local(states).gather(1, actions)

    loss = nn.MSELoss()(q_expected, q_targets)

    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(q_network_local.parameters(), 1)
    optimizer.step()

    # Soft update
    for target_param, local_param in zip(q_network_target.parameters(), q_network_local.parameters()):
        target_param.data.copy_(0.01 * local_param.data + (1.0 - 0.01) * target_param.data)

# Training Loop
episodes = 1200
max_t = 1000 
epsilon = 1.0
epsilon_decay = 0.997
epsilon_min = 0.01

scores = []
scores_window = deque(maxlen=100)

for i_episode in range(1, episodes + 1):
    state = env.reset()[0] 
    score = 0
    
    for t in range(max_t):
        state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(device)
        q_network_local.eval()
        with torch.no_grad():
            action_values = q_network_local(state_tensor)
        q_network_local.train()
        
        if random.random() > epsilon:
            action = np.argmax(action_values.cpu().data.numpy())
        else:
            action = random.choice(np.arange(action_size))
            
        next_state, reward, terminated, truncated, _ = env.step(action)

        # Reward shaping
        position, velocity = next_state
        reward = reward + 0.5 * abs(velocity) + (position + 0.5)
        done = terminated or truncated 
        
        memory.add(state, action, reward, next_state, done)
        
        if len(memory) > BATCH_SIZE and t % UPDATE_EVERY == 0:
            experiences = memory.sample()
            learn(experiences, GAMMA)
            
        state = next_state
        score += reward
        if done:
            break
            
    scores_window.append(score)
    scores.append(score)
    
    epsilon = max(epsilon_min, epsilon * epsilon_decay)
    
    if i_episode % 100 == 0:
        print(f'\rEpisode {i_episode}\tAverage Score: {np.mean(scores_window):.2f}')

print('Training finished!')
env.close()

Using device: cuda
Episode 100	Average Score: -201.43
Episode 200	Average Score: -194.04
Episode 300	Average Score: -191.25
Episode 400	Average Score: -191.03
Episode 500	Average Score: -183.16
Episode 600	Average Score: -181.69
Episode 700	Average Score: -180.63
Episode 800	Average Score: -165.29
Episode 900	Average Score: -157.39
Episode 1000	Average Score: -139.33
Episode 1100	Average Score: -134.86
Episode 1200	Average Score: -126.49
Training finished!


In [29]:
import imageio

render_env = gym.make('MountainCar-v0', render_mode='rgb_array')

frames = []
state = render_env.reset()[0]
done = False

while not done:
    state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(device)
    
    q_network_local.eval()
    with torch.no_grad():
        action_values = q_network_local(state_tensor)
    action = np.argmax(action_values.cpu().data.numpy())
    
    next_state, reward, terminated, truncated, _ = render_env.step(action)
    done = terminated or truncated
    
    frame = render_env.render()
    frames.append(frame)
    
    state = next_state

render_env.close()

# Save the video
video_path = "mountaincar_DRL.mp4"
imageio.mimsave(video_path, frames, fps=30, macro_block_size=None)
print(f"Video saved as: {video_path}")


error: XDG_RUNTIME_DIR not set in the environment.


Video saved as: mountaincar_DRL.mp4
