In [1]:
pip install numpy==1.26.4

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install torch==2.1.0 torchvision==0.16.0 torchaudio==2.1.0




In [3]:
pip install gym-super-mario-bros==7.4.0

Note: you may need to restart the kernel to use updated packages.


In [4]:
pip install tensordict==0.3.0

Note: you may need to restart the kernel to use updated packages.


In [5]:
pip install torchrl==0.3.0

Note: you may need to restart the kernel to use updated packages.


In [27]:
import torch
from torch import nn
from torchvision import transforms as T
from PIL import Image
import numpy as np
from pathlib import Path
from collections import deque
import random, datetime, os
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
import random

import gym
from gym.spaces import Box
from gym.wrappers import FrameStack

from nes_py.wrappers import JoypadSpace

# Super Mario environment for OpenAI Gym
import gym_super_mario_bros

from tensordict import TensorDict
from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage

from sklearn.metrics import accuracy_score

Initializing Environment

In [7]:
if gym.__version__ < '0.26':
    env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", new_step_api=True)
else:
    env = gym_super_mario_bros.make("SuperMarioBros-1-1-v0", render_mode='human', apply_api_compatibility=True)

# Action space: 0-walk right, 1-jump right
env = JoypadSpace(env, [["right"], ["right", "A"]])

env.reset()
next_state, reward, done, trunc, info = env.step(action=0)
print(f"{next_state.shape},\n {reward},\n {done},\n {info}")

  logger.warn(
  logger.warn(


(240, 256, 3),
 0.0,
 False,
 {'coins': 0, 'flag_get': False, 'life': 2, 'score': 0, 'stage': 1, 'status': 'small', 'time': 400, 'world': 1, 'x_pos': 40, 'y_pos': 79}


  if not isinstance(terminated, (bool, np.bool8)):


Using wrappers to preprocess environment data to give our agent only information it needs: making the structure of the environment a 3D array: [4, 84, 84]

In [8]:
class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        """Return only every `skip`-th frame"""
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        """Repeat action, and sum reward"""
        total_reward = 0.0
        for i in range(self._skip):
            # Accumulate reward and repeat the same action
            obs, reward, done, trunk, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, trunk, info


class GrayScaleObservation(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs_shape = self.observation_space.shape[:2]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def permute_orientation(self, observation):
        # permute [H, W, C] array to [C, H, W] tensor
        observation = np.transpose(observation, (2, 0, 1))
        observation = torch.tensor(observation.copy(), dtype=torch.float)
        return observation

    def observation(self, observation):
        observation = self.permute_orientation(observation)
        transform = T.Grayscale()
        observation = transform(observation)
        return observation


class ResizeObservation(gym.ObservationWrapper):
    def __init__(self, env, shape):
        super().__init__(env)
        if isinstance(shape, int):
            self.shape = (shape, shape)
        else:
            self.shape = tuple(shape)

        obs_shape = self.shape + self.observation_space.shape[2:]
        self.observation_space = Box(low=0, high=255, shape=obs_shape, dtype=np.uint8)

    def observation(self, observation):
        transforms = T.Compose(
            [T.Resize(self.shape, antialias=True), T.Normalize(0, 255)]
        )
        observation = transforms(observation).squeeze(0)
        return observation


# Apply Wrappers to environment
env = SkipFrame(env, skip=4)
env = GrayScaleObservation(env)
env = ResizeObservation(env, shape=84)
if gym.__version__ < '0.26':
    env = FrameStack(env, num_stack=4, new_step_api=True)
else:
    env = FrameStack(env, num_stack=4)

#### Agent: Mario  
Goals  
- Makes optimal action policy based on current state  
- Learns a better action policy over time
- Q learning agent

In [9]:
class Mario:
    def __init__():
        pass
    def action(self, state):
        """Choose optimal action, given a state"""
        pass
    def remember(self, experience):
        """Add the experience to memory"""
        pass
    def recall(self):
        """Sample experience"""
        pass
    def learn(self):
        """Update online action value (Q) function"""
        pass 

#### Imitation Approach

In [10]:
# calling and playing the game to learn
import pygame
from tqdm import tqdm

expert_data = []

def play_and_record(env, num_episodes=5):
    print("Starting expert data collection. Use arrow keys + A/Z for jump.")
    actions = {
        pygame.K_RIGHT: 0,  
        pygame.K_z: 1       
    }

    pygame.init()
    for episode in range(num_episodes):
        state = env.reset()[0]
        done = False
        print(f"Episode {episode + 1}")
        while not done:
            env.render()

            action = 0
            keys = pygame.key.get_pressed()
            if keys[pygame.K_z]:  
                action = 1
            elif keys[pygame.K_RIGHT]:
                action = 0

            next_state, reward, done, _, _ = env.step(action)
            expert_data.append((state, action))  
            state = next_state

            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    return

    pygame.quit()
    return expert_data

In [11]:
!pip install pygame



In [12]:
demo = play_and_record(env)
torch.save(demo, "expert_demo.pt")

Starting expert data collection. Use arrow keys + A/Z for jump.
Episode 1


  logger.warn(


Episode 2
Episode 3
Episode 4
Episode 5


##### Defining the dataset for loading expert data

In [18]:
torch.manual_seed(42) 

class ExpertDataset(Dataset):
    def __init__(self, expert_data):
        self.states = [torch.tensor(np.array(s), dtype=torch.float32) / 255.0 for s, _ in expert_data]
        self.actions = [a for _, a in expert_data]

    def __len__(self):
        return len(self.states)

    def __getitem__(self, idx):
        return self.states[idx], self.actions[idx]

##### Define the CNN model for behavior cloning

In [19]:
class ImitationCNN(nn.Module):
    def __init__(self, num_actions=2):
        super(ImitationCNN, self).__init__()
        self.net = nn.Sequential(
            nn.Conv2d(4, 32, kernel_size=8, stride=4),  # (84-8)/4+1 = 20
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2), # (20-4)/2+1 = 9
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1), # (9-3)/1+1 = 7
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(64 * 7 * 7, 512),
            nn.ReLU(),
            nn.Linear(512, num_actions)
        )

    def forward(self, x):
        return self.net(x)

In [20]:
# load expert data in state action pairs
expert_data = torch.load("expert_demo.pt")  

dataset = ExpertDataset(expert_data)

dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

##### Initialize model, loss and optimizer

In [22]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = ImitationCNN(num_actions=2).to(device)
loss_fn = nn.CrossEntropyLoss()  
optimizer = optim.Adam(model.parameters(), lr=1e-4)

##### Training

In [24]:
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    for batch_idx, (states, actions) in enumerate(dataloader):
        states = states.to(device)              
        actions = torch.tensor(actions).to(device)  

        # forward pass
        logits = model(states)  
        
        # compute loss
        loss = loss_fn(logits, actions)

        # backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        if batch_idx % 10 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(dataloader)}], Loss: {loss.item():.4f}")

    avg_loss = total_loss / len(dataloader)
    print(f"Epoch [{epoch+1}] completed. Average Loss: {avg_loss:.4f}")


torch.save(model.state_dict(), "imitation_mario.pth")
print("Model saved as imitation_mario.pth")

  actions = torch.tensor(actions).to(device)


Epoch [1/10], Batch [1/7], Loss: 0.6859
Epoch [1] completed. Average Loss: 0.6506
Epoch [2/10], Batch [1/7], Loss: 0.6055
Epoch [2] completed. Average Loss: 0.5668
Epoch [3/10], Batch [1/7], Loss: 0.5081
Epoch [3] completed. Average Loss: 0.4528
Epoch [4/10], Batch [1/7], Loss: 0.3720
Epoch [4] completed. Average Loss: 0.3043
Epoch [5/10], Batch [1/7], Loss: 0.2135
Epoch [5] completed. Average Loss: 0.1536
Epoch [6/10], Batch [1/7], Loss: 0.0857
Epoch [6] completed. Average Loss: 0.0551
Epoch [7/10], Batch [1/7], Loss: 0.0260
Epoch [7] completed. Average Loss: 0.0164
Epoch [8/10], Batch [1/7], Loss: 0.0080
Epoch [8] completed. Average Loss: 0.0054
Epoch [9/10], Batch [1/7], Loss: 0.0032
Epoch [9] completed. Average Loss: 0.0024
Epoch [10/10], Batch [1/7], Loss: 0.0016
Epoch [10] completed. Average Loss: 0.0013
Model saved as imitation_mario.pth


#### Evaluation

##### Evaluating accuracy of imitation model

In [28]:
from sklearn.model_selection import train_test_split

expert_data = torch.load("expert_demo.pt")
train_data, test_data = train_test_split(expert_data, test_size=0.2, random_state=42)

train_dataset = ExpertDataset(train_data)
test_dataset = ExpertDataset(test_data)

In [26]:
def evaluate_accuracy(model, dataset, device="cpu"):
    model.eval()
    loader = DataLoader(dataset, batch_size=32)

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for states, labels in loader:
            states = states.to(device)
            labels = labels.to(device)

            logits = model(states)
            preds = logits.argmax(dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    print(f"Model Accuracy on Held-Out Data: {acc*100:.2f}%")
    return acc


In [29]:
evaluate_accuracy(model, test_dataset, device)

Model Accuracy on Held-Out Data: 100.00%


1.0

In [30]:
def evaluate_imitation_model(model, env, episodes=5, device="cpu", render=False):
    model.eval()
    rewards = []
    distances = []
    steps_survived = []

    for ep in range(episodes):
        state = env.reset()[0]
        done = False
        total_reward = 0
        step_count = 0

        while not done:
            if render:
                env.render()

            # preprocess state
            state_tensor = torch.tensor(np.array(state), dtype=torch.float32).unsqueeze(0).to(device)
            state_tensor = state_tensor / 255.0

            # predicting the action
            with torch.no_grad():
                logits = model(state_tensor)
                action = logits.argmax(dim=1).item()

            state, reward, done, _, info = env.step(action)
            total_reward += reward
            step_count += 1

        rewards.append(total_reward)
        distances.append(info.get('x_pos', 0))
        steps_survived.append(step_count)

        print(f"Episode {ep+1}: Reward={total_reward}, Distance={distances[-1]}, Steps={step_count}")

    print(f"Avg Reward: {sum(rewards)/len(rewards):.2f}")
    print(f"Avg Distance: {sum(distances)/len(distances):.2f}")
    print(f"Avg Steps Survived: {sum(steps_survived)/len(steps_survived):.2f}")


In [31]:
evaluate_imitation_model(model, env, episodes=5, device=device, render=True)

Episode 1: Reward=231.0, Distance=296, Steps=40
Episode 2: Reward=231.0, Distance=296, Steps=40
Episode 3: Reward=231.0, Distance=296, Steps=40
Episode 4: Reward=231.0, Distance=296, Steps=40
Episode 5: Reward=231.0, Distance=296, Steps=40
Avg Reward: 231.00
Avg Distance: 296.00
Avg Steps Survived: 40.00
