# Installing Important things


In [11]:
!apt-get update -qq
!apt-get install -y swig
!pip install Box2D-kengz --quiet
!pip uninstall -y box2d-py
!pip install gymnasium[box2d] --quiet
!pip install imageio opencv-python timm --quiet



W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
swig is already the newest version (4.0.2-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 142 not upgraded.
Found existing installation: box2d-py 2.3.5
Uninstalling box2d-py-2.3.5:
  Successfully uninstalled box2d-py-2.3.5


# Importing Libraries

In [12]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import imageio
import cv2
from IPython.display import HTML
from collections import deque
from torch.distributions import Normal
import timm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)


Using device: cuda


# Multiframe Wrapping 

In [20]:
class FrameStackAvgWrapper(gym.Wrapper):
    def __init__(self, env, num_stack=4):
        super().__init__(env)
        self.num_stack = num_stack
        self.frames = deque([], maxlen=num_stack)
        orig_shape = self.env.observation_space.shape  # e.g., (96, 96, 3)
        self.H, self.W, self.C = orig_shape
        self.observation_space = gym.spaces.Box(
            low=0, high=255,
            shape=(self.H, self.W, self.C * self.num_stack),
            dtype=env.observation_space.dtype
        )
    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        self.frames.clear()
        for _ in range(self.num_stack):
            self.frames.append(obs)
        return self._get_obs(), info
    def step(self, action):
        obs, reward, done, truncated, info = self.env.step(action)
        self.frames.append(obs)
        return self._get_obs(), reward, done, truncated, info
    def _get_obs(self):
        return np.concatenate(list(self.frames), axis=-1)

num_stack = 4
env_id = "CarRacing-v2"
env = gym.make(env_id, render_mode="rgb_array")
env = FrameStackAvgWrapper(env, num_stack=num_stack)
test_obs, _ = env.reset()
print("Stacked Observation Shape:", test_obs.shape)  # Expect (96,96,12)


Stacked Observation Shape: (96, 96, 12)


# Vit Based Actor and Critic Network

In [24]:
class ViTFeatureExtractor(nn.Module):
    def __init__(self, model_name="vit_base_patch16_224"):
        super(ViTFeatureExtractor, self).__init__()
        self.vit = timm.create_model(model_name, pretrained=True)
        for param in self.vit.parameters():
            param.requires_grad = False
        if hasattr(self.vit, 'fc'):
            self.vit.fc = nn.Identity()
        elif hasattr(self.vit, 'head'):
            self.vit.head = nn.Identity()
    def forward(self, x):
        return self.vit(x)

class Actor(nn.Module):
    def __init__(self, action_dim, feature_dim=768):
        super(Actor, self).__init__()
        self.feature_extractor = ViTFeatureExtractor()
        self.fc = nn.Sequential(
            nn.Linear(feature_dim, 256),
            nn.ReLU()
        )
        self.mean_head = nn.Linear(256, action_dim)
        self.log_std = nn.Parameter(torch.zeros(action_dim))
    def forward(self, x):
        features = self.feature_extractor(x)
        x = self.fc(features)
        mean = self.mean_head(x)
        std = torch.exp(self.log_std)
        return mean, std

class Critic(nn.Module):
    def __init__(self, feature_dim=768):
        super(Critic, self).__init__()
        self.feature_extractor = ViTFeatureExtractor()
        self.fc = nn.Sequential(
            nn.Linear(feature_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )
    def forward(self, x):
        features = self.feature_extractor(x)
        return self.fc(features)



#  PPO Agent with ViT Feature Extraction

In [25]:
class PPOAgentViT:
    def __init__(self, action_dim, lr=1e-4, gamma=0.99, lam=0.95, clip_epsilon=0.3, update_epochs=5, batch_size=32):
        self.gamma = gamma
        self.lam = lam
        self.clip_epsilon = clip_epsilon
        self.update_epochs = update_epochs
        self.batch_size = batch_size

        self.actor = Actor(action_dim).to(device)
        self.critic = Critic().to(device)

        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr)

    def process_obs(self, obs):
        # obs shape: (96,96,12) -> split into 4 frames, each (96,96,3)
        frames = np.split(obs, num_stack, axis=-1)
        avg_frame = np.mean(frames, axis=0).astype(np.uint8)
        resized = cv2.resize(avg_frame, (224, 224))
        final_obs = np.transpose(resized, (2, 0, 1))  # (3,224,224)
        return final_obs

    def select_action(self, obs):
        proc_obs = self.process_obs(obs)
        obs_tensor = torch.FloatTensor(proc_obs).unsqueeze(0).to(device)
        mean, std = self.actor(obs_tensor)
        dist = Normal(mean, std)
        sample = dist.sample()
        action = torch.clamp(sample, -1.0, 1.0)
        log_prob = dist.log_prob(sample).sum(dim=-1)
        value = self.critic(obs_tensor)
        return (
            action.detach().cpu().numpy().flatten(),
            log_prob.detach().cpu().numpy().flatten(),
            value.detach().cpu().numpy().flatten()
        )

    def compute_gae(self, rewards, dones, values):
        advantages = []
        gae = 0
        next_value = 0
        for i in reversed(range(len(rewards))):
            delta = rewards[i] + self.gamma * next_value * (1 - dones[i]) - values[i]
            gae = delta + self.gamma * self.lam * gae * (1 - dones[i])
            next_value = values[i]
            advantages.insert(0, gae)
        return advantages

    def evaluate_actions(self, obs_batch, act_batch):
        proc_obs = [self.process_obs(obs) for obs in obs_batch]
        obs_tensor = torch.FloatTensor(np.array(proc_obs)).to(device)
        if obs_tensor.dim() == 3:
            obs_tensor = obs_tensor.unsqueeze(0)
        act_tensor = torch.FloatTensor(np.array(act_batch)).to(device)
        mean, std = self.actor(obs_tensor)
        dist = Normal(mean, std)
        log_probs = dist.log_prob(act_tensor).sum(dim=-1)
        entropy = dist.entropy().sum(dim=-1)
        values = self.critic(obs_tensor)
        return log_probs, entropy, values

    def update(self, trajectories):
        obs_list = np.array([t[0] for t in trajectories])
        act_list = np.array([t[1] for t in trajectories])
        rew_list = np.array([t[2] for t in trajectories])
        old_logp_list = np.array([t[3] for t in trajectories])
        val_list = np.array([t[4] for t in trajectories])
        done_list = np.array([t[5] for t in trajectories])

        advantages = self.compute_gae(rew_list, done_list, val_list)
        advantages = np.array(advantages)
        returns = advantages + val_list

        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        dataset_size = len(trajectories)
        indices = np.arange(dataset_size)
        for _ in range(self.update_epochs):
            np.random.shuffle(indices)
            start_idx = 0
            while start_idx < dataset_size:
                end_idx = min(start_idx + self.batch_size, dataset_size)
                batch_idx = indices[start_idx:end_idx]
                start_idx = end_idx

                mb_obs = obs_list[batch_idx]
                mb_act = act_list[batch_idx]
                mb_old_logp = torch.FloatTensor(old_logp_list[batch_idx]).to(device)
                mb_returns = torch.FloatTensor(returns[batch_idx]).to(device)
                mb_adv = torch.FloatTensor(advantages[batch_idx]).to(device)

                log_probs, entropy, values = self.evaluate_actions(mb_obs, mb_act)
                ratio = torch.exp(log_probs - mb_old_logp)
                surr1 = ratio * mb_adv
                surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * mb_adv
                actor_loss = -torch.min(surr1, surr2).mean() - 0.01 * entropy.mean()
                self.actor_optimizer.zero_grad()
                actor_loss.backward()
                self.actor_optimizer.step()

                critic_loss = nn.MSELoss()(values.squeeze(1), mb_returns.squeeze(1))
                self.critic_optimizer.zero_grad()
                critic_loss.backward()
                self.critic_optimizer.step()



# Advanced Reward Shaping

In [26]:
def advanced_reward_shaping(obs, original_reward, done, truncated):
    r = original_reward + 0.1  # Living reward
    # Process obs: average frames to a single image
    proc = np.mean(obs.reshape(obs.shape[0], obs.shape[1], 3, -1), axis=-1)
    center_region = proc[obs.shape[0]//2-10:obs.shape[0]//2+10, :]
    avg_brightness = np.mean(center_region)
    if 110 <= avg_brightness <= 140:
        r += 0.3
    else:
        r -= 0.5
    # Penalize excessive steering (assume first action element is steering)
    # Since this is a demonstration, we subtract a penalty proportional to the absolute steering value.
    # You may tune this coefficient further.
    # Here, note: agent.select_action returns an action vector; we can pass that externally if needed.
    # For simplicity, assume if original_reward is small, add extra penalty.
    if original_reward < 0:
        r -= 1.0
    if done or truncated:
        r -= 10.0
    return r



#  Training Loop & Visualization

In [None]:
agent = PPOAgentViT(action_dim=3, lr=1e-4, clip_epsilon=0.3, update_epochs=5, batch_size=32)
num_episodes = 500
max_timesteps = 1000
all_rewards = []

for episode in range(num_episodes):
    obs, _ = env.reset()
    ep_reward = 0
    trajectories = []
    for t in range(max_timesteps):
        action, logp, val = agent.select_action(obs)
        next_obs, reward, done, truncated, _ = env.step(action)
        shaped_r = advanced_reward_shaping(obs, reward, done, truncated)
        trajectories.append((obs, action, shaped_r, logp, val, float(done or truncated)))
        obs = next_obs
        ep_reward += shaped_r
        if done or truncated:
            break
    all_rewards.append(ep_reward)
    agent.update(trajectories)
    print(f"Episode {episode}, Shaped Reward: {ep_reward}")

plt.figure(figsize=(10,6))
plt.plot(all_rewards)
plt.xlabel("Episode")
plt.ylabel("Shaped Reward")
plt.title("Training Curve: CarRacing-v2 PPO with ViT Feature Extraction")
plt.show()


# Real-Time Visual Demonstration

In [None]:
env_vis = gym.make(env_id, render_mode=render_mode)
env_vis = FrameStackAvgWrapper(env_vis, num_stack=num_stack)
s, _ = env_vis.reset()
frames = []
for t in range(1200):
    frame = env_vis.render()
    frames.append(frame)
    a, _, _ = agent.select_action(s)
    s_next, r, d, trunc, _ = env_vis.step(a)
    s = s_next
    if d or trunc:
        break
env_vis.close()
imageio.mimsave('carracing_vit_ppo.gif', frames, fps=30)
HTML('<img src="carracing_vit_ppo.gif">')

