In [38]:
!apt-get update -qq
!apt-get install -y swig
!pip install Box2D-kengz --quiet
!pip uninstall -y box2d-py
!pip install gymnasium[box2d] --quiet
!pip install imageio --quiet


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
swig is already the newest version (4.0.2-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 141 not upgraded.
Found existing installation: box2d-py 2.3.5
Uninstalling box2d-py-2.3.5:
  Successfully uninstalled box2d-py-2.3.5


In [39]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import imageio
from IPython.display import HTML
from collections import deque
from torch.distributions import Normal

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)


Using device: cuda


In [40]:
class FrameStackWrapper(gym.Wrapper):
    def __init__(self, env, num_stack=4):
        super().__init__(env)
        self.num_stack = num_stack
        self.frames = deque([], maxlen=num_stack)
        orig_shape = self.env.observation_space.shape  # e.g. (96, 96, 3)
        self.H, self.W, self.C = orig_shape
        # new shape => (H, W, C * num_stack)
        self.observation_space = gym.spaces.Box(
            low=0, high=255,
            shape=(self.H, self.W, self.C * self.num_stack),
            dtype=env.observation_space.dtype
        )
    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        self.frames.clear()
        for _ in range(self.num_stack):
            self.frames.append(obs)
        return self._get_obs(), info
    def step(self, action):
        obs, reward, done, truncated, info = self.env.step(action)
        self.frames.append(obs)
        return self._get_obs(), reward, done, truncated, info
    def _get_obs(self):
        return np.concatenate(list(self.frames), axis=-1)


In [41]:
num_stack = 4
env_id = "CarRacing-v2"
env = gym.make(env_id, render_mode="rgb_array")
env = FrameStackWrapper(env, num_stack=num_stack)

test_obs, _ = env.reset()
print("Test Obs Shape:", test_obs.shape)  # e.g. (96, 96, 12)


Test Obs Shape: (96, 96, 12)


In [42]:
class Critic(nn.Module):
    def __init__(self, in_channels=12):
        super(Critic, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        self.fc = nn.Sequential(
            nn.Linear(64 * 8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 1)
        )
    def forward(self, x):
        # x shape => (N, 12, 96, 96)
        x = x.contiguous()  # fix for possible non-contiguous memory
        x = x / 255.0
        x = self.conv(x)
        x = x.contiguous().view(x.size(0), -1)
        return self.fc(x)

class Actor(nn.Module):
    def __init__(self, in_channels=12, action_dim=3):
        super(Actor, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        self.fc = nn.Sequential(
            nn.Linear(64 * 8 * 8, 512),
            nn.ReLU()
        )
        self.mean_head = nn.Linear(512, action_dim)
        self.log_std = nn.Parameter(torch.zeros(action_dim))
    def forward(self, x):
        x = x.contiguous()  # ensure contiguous memory
        x = x / 255.0
        x = self.conv(x)
        x = x.contiguous().view(x.size(0), -1)
        x = self.fc(x)
        mean = self.mean_head(x)
        std = torch.exp(self.log_std)
        return mean, std


In [43]:
class PPOAgent:
    def __init__(
        self,
        in_channels=12,
        action_dim=3,
        lr=1e-4,
        gamma=0.99,
        lam=0.95,
        clip_epsilon=0.3,
        update_epochs=5,
        batch_size=32
    ):
        self.gamma = gamma
        self.lam = lam
        self.clip_epsilon = clip_epsilon
        self.update_epochs = update_epochs
        self.batch_size = batch_size

        self.actor = Actor(in_channels=in_channels, action_dim=action_dim).to(device)
        self.critic = Critic(in_channels=in_channels).to(device)

        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr)

    def select_action(self, obs):
        # obs shape => (96, 96, 12)
        # We'll permute => (N, C, H, W)
        obs_tensor = torch.FloatTensor(obs).permute(2, 0, 1).unsqueeze(0).to(device)
        mean, std = self.actor(obs_tensor)
        dist = Normal(mean, std)
        sample = dist.sample()
        action = torch.clamp(sample, -1.0, 1.0)
        log_prob = dist.log_prob(sample).sum(dim=-1)
        value = self.critic(obs_tensor)
        return (
            action.detach().cpu().numpy().flatten(),
            log_prob.detach().cpu().numpy().flatten(),
            value.detach().cpu().numpy().flatten()
        )

    def compute_gae(self, rewards, dones, values):
        advantages = []
        gae = 0
        next_value = 0
        for i in reversed(range(len(rewards))):
            delta = rewards[i] + self.gamma * next_value * (1 - dones[i]) - values[i]
            gae = delta + self.gamma * self.lam * gae * (1 - dones[i])
            next_value = values[i]
            advantages.insert(0, gae)
        return advantages

    def evaluate_actions(self, obs_batch, act_batch):
        # obs_batch => (N, 96, 96, 12)
        # permute => (N, 12, 96, 96)
        obs_tensor = torch.FloatTensor(obs_batch).permute(0, 3, 1, 2).to(device)
        act_tensor = torch.FloatTensor(act_batch).to(device)

        mean, std = self.actor(obs_tensor)
        dist = Normal(mean, std)
        log_probs = dist.log_prob(act_tensor).sum(dim=-1)
        entropy = dist.entropy().sum(dim=-1)
        values = self.critic(obs_tensor)
        return log_probs, entropy, values

    def update(self, trajectories):
        obs_list = np.array([t[0] for t in trajectories])
        act_list = np.array([t[1] for t in trajectories])
        rew_list = np.array([t[2] for t in trajectories])
        old_logp_list = np.array([t[3] for t in trajectories])
        val_list = np.array([t[4] for t in trajectories])
        done_list = np.array([t[5] for t in trajectories])

        advantages = self.compute_gae(rew_list, done_list, val_list)
        advantages = np.array(advantages)
        returns = advantages + val_list

        # advantage normalization
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        dataset_size = len(trajectories)
        indices = np.arange(dataset_size)

        for _ in range(self.update_epochs):
            np.random.shuffle(indices)
            start_idx = 0
            while start_idx < dataset_size:
                end_idx = min(start_idx + self.batch_size, dataset_size)
                batch_idx = indices[start_idx:end_idx]
                start_idx = end_idx

                mb_obs = obs_list[batch_idx]
                mb_act = act_list[batch_idx]
                mb_old_logp = torch.FloatTensor(old_logp_list[batch_idx]).to(device)
                mb_returns = torch.FloatTensor(returns[batch_idx]).to(device)
                mb_adv = torch.FloatTensor(advantages[batch_idx]).to(device)

                log_probs, entropy, values = self.evaluate_actions(mb_obs, mb_act)

                ratio = torch.exp(log_probs - mb_old_logp)
                surr1 = ratio * mb_adv
                surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * mb_adv

                # Actor
                actor_loss = -torch.min(surr1, surr2).mean() - 0.01 * entropy.mean()
                self.actor_optimizer.zero_grad()
                actor_loss.backward()
                self.actor_optimizer.step()

                # Critic
                critic_loss = nn.MSELoss()(values.squeeze(1), mb_returns)
                self.critic_optimizer.zero_grad()
                critic_loss.backward()
                self.critic_optimizer.step()


In [44]:
def ultra_reward_shaping(obs, original_reward, done, truncated):
    """
    1) A small living reward (0.1).
    2) If original_reward > 0 => +0.2 to boost good driving steps.
    3) Check a center row for 'gray' track color => if ratio > 0.5 => +0.3, else -0.5
    4) Episode end => -10
    """
    shaped = original_reward + 0.1
    if original_reward > 0:
        shaped += 0.2
    h, w, c = obs.shape
    center_row = obs[h//2, :, :]
    gray_approx = np.mean(center_row, axis=1)
    gray_pixels = ((gray_approx >= 110) & (gray_approx <= 140)).sum()
    ratio_gray = gray_pixels / w
    if ratio_gray > 0.5:
        shaped += 0.3
    else:
        shaped -= 0.5
    if done or truncated:
        shaped -= 10.0
    return shaped


In [None]:
agent = PPOAgent(in_channels=num_stack*3, action_dim=3, lr=1e-4, clip_epsilon=0.3, update_epochs=5, batch_size=32)
num_episodes = 300
max_timesteps = 1000
all_rewards = []

for episode in range(num_episodes):
    obs, _ = env.reset()
    ep_reward = 0
    trajectories = []
    for t in range(max_timesteps):
        action, logp, val = agent.select_action(obs)
        next_obs, reward, done, truncated, _ = env.step(action)
        shaped_reward = ultra_reward_shaping(obs, reward, done, truncated)
        trajectories.append((obs, action, shaped_reward, logp, val, float(done or truncated)))
        obs = next_obs
        ep_reward += shaped_reward
        if done or truncated:
            break
    all_rewards.append(ep_reward)
    agent.update(trajectories)
    print(f"Episode {episode}, Shaped Reward: {ep_reward}")

plt.figure(figsize=(10,6))
plt.plot(all_rewards)
plt.xlabel("Episode")
plt.ylabel("Shaped Reward")
plt.title("CarRacing-v2 PPO with FrameStack & Contiguous Fix")
plt.show()


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Episode 0, Shaped Reward: 218.014035087724
Episode 1, Shaped Reward: 302.4680134680195
Episode 2, Shaped Reward: 295.9146579804619
Episode 3, Shaped Reward: 296.77894736842694
Episode 4, Shaped Reward: 296.2493150684991
Episode 5, Shaped Reward: 294.2727272727331
Episode 6, Shaped Reward: 296.14716981132665
Episode 7, Shaped Reward: 293.36942675159816
Episode 8, Shaped Reward: 293.849315068499
Episode 9, Shaped Reward: 293.43086816720836
Episode 10, Shaped Reward: 292.7529411764763
Episode 11, Shaped Reward: 294.06713780919307
Episode 12, Shaped Reward: 293.4724919093909
Episode 13, Shaped Reward: 296.0666666666726
Episode 14, Shaped Reward: 296.5684587813679
Episode 15, Shaped Reward: 295.40336134454367
Episode 16, Shaped Reward: 296.9567567567627
Episode 17, Shaped Reward: 296.5174377224258
Episode 18, Shaped Reward: 296.8666666666726
Episode 19, Shaped Reward: 296.3204152249194
Episode 20, Shaped Reward: 296.0006600660125
Episode 21, Shaped Reward: 297.4992700729986
Episode 22, Shap

In [None]:
env_vis = gym.make("CarRacing-v2", render_mode="rgb_array")
env_vis = FrameStackWrapper(env_vis, num_stack=num_stack)
s, _ = env_vis.reset()
frames = []
for t in range(1200):
    frame = env_vis.render()
    frames.append(frame)
    a, _, _ = agent.select_action(s)
    s_next, r, d, trunc, _ = env_vis.step(a)
    s = s_next
    if d or trunc:
        break
env_vis.close()
imageio.mimsave('carracing_ppo.gif', frames, fps=30)
HTML('<img src="carracing_ppo.gif">')
