<a href="https://colab.research.google.com/github/gondore/nsdc-crafter/blob/main/DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install gymnasium crafter stable_baselines3 shimmy[gym-v26]

# **Quick Start**
Replace '/content/sample_data/new_mount_2' with a new empty folder path in google colab. Replace other file paths with the same path you used appropriately.

- After training, you will get a collection of short videos inside the file path – these are videos of each episode.
- We can use the dedicated video concatenation function to combine all the videos together as final_training_video.mp4.
- data can be found through the stats.jsonl file generated in the file path after training

In [2]:
import gymnasium as gym
import crafter


env = crafter.Env()
env = crafter.Recorder(
  env, '/content/sample_data/NSDC_official_training',
  save_stats=True,
  save_video=True,
  save_episode=False,
)
import matplotlib.pyplot as plt
import numpy as np

In [3]:
import numpy as np
from skimage.color import rgb2gray
from skimage import transform
import matplotlib.pyplot as plt
from collections import deque
import random

import warnings
warnings.filterwarnings('ignore')

**Preprocessing observation space**

We could experiment with performance by playing with:

- grayscale frames
- normalizing pixel values
- resize preprocessed frame/ cut off the health bar and inventory maybe?


In [5]:
def preprocess_frame(frame):
    # Greyscale frame
    gray = rgb2gray(frame)

    # Normalize Pixel Values
    normalized_frame = frame/255.0

    # Resize
    preprocessed_frame = transform.resize(normalized_frame, [110,84])

    return preprocessed_frame # 110x84x1 frame

In [6]:
# try out using tensorboard maybe?
#writer = tf.summary.FileWriter("/tensorboard/dqn/1")

## Losses
#tf.summary.scalar("Loss", DQNetwork.loss)

#write_op = tf.summary.merge_all()

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
import gymnasium as gym
import matplotlib.pyplot as plt



class ConvDQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(ConvDQN, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=8, stride=4),  # input channels = 3 for RGB images
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )

        self.fc_layers = nn.Sequential(
            nn.Linear(64 * 4 * 4, 512),
            nn.ReLU(),
            nn.Linear(512, output_dim)
        )

    def forward(self, x):
        x = x.permute(0, 3, 1, 2)  # Change image format from HWC to CHW expected by PyTorch
        conv_out = self.conv_layers(x).reshape(x.size(0), -1)  # Flatten the output for the FC layers
        return self.fc_layers(conv_out)



In [8]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return np.array(state), action, reward, np.array(next_state), done

    def __len__(self):
        return len(self.buffer)


In [9]:
class DQNAgent:
    def __init__(self, state_dim, action_dim, replay_buffer):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.replay_buffer = replay_buffer
        self.model = ConvDQN(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters())
        self.criterion = nn.MSELoss()
        self.epsilon = 0.99
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01
        self.gamma = 0.99

      # Initialize target model and set weights equal to the model weights
        self.target_model = ConvDQN(state_dim, action_dim)
        self.update_target()

    def update_target(self):
        self.target_model.load_state_dict(self.model.state_dict())


    def act(self, state):
        if random.random() > self.epsilon:
            state = torch.FloatTensor(state).unsqueeze(0)
            q_values = self.model(state)
            action = q_values.max(1)[1].item()
        else:
            action = random.randrange(self.action_dim)
        return action

    def train(self, batch_size):
        if len(self.replay_buffer) < batch_size:
            return
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)
        states = torch.FloatTensor(states)
        next_states = torch.FloatTensor(next_states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        dones = torch.FloatTensor(dones)

        q_values = self.model(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        next_q_values = self.target_model(next_states).max(1)[0]
        expected_q_values = rewards + self.gamma * next_q_values * (1 - dones)

        loss = self.criterion(q_values, expected_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay


In [None]:
state_dim = np.prod(env.observation_space.shape)  # This ensures a flat vector input size is correctly identified
action_dim = env.action_space.n

action_dim = env.action_space.n
replay_buffer = ReplayBuffer(10000)
agent = DQNAgent(state_dim, action_dim, replay_buffer)

def train_dqn(episodes, target_update=10):
    rewards = []
    for episode in range(episodes):
        if episode % target_update == 0:
          agent.update_target()
        state = env.reset()
        episode_reward = 0
        while True:
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            replay_buffer.push(state, action, reward, next_state, done)
            state = next_state
            episode_reward += reward
            agent.train(32)
            if done:
                break
        rewards.append(episode_reward)
        print(f'Episode: {episode}, Reward: {episode_reward}')
        agent.epsilon = max(agent.epsilon_min, agent.epsilon_decay * agent.epsilon)  # Decay epsilon
    return rewards

episodes = 1000
rewards = train_dqn(episodes)


Episode: 0, Reward: 2.099999999999999
Episode: 1, Reward: 2.099999999999999
Episode: 2, Reward: 2.0999999999999996
Episode: 3, Reward: 2.099999999999999
Episode: 4, Reward: 2.0999999999999996
Episode: 5, Reward: 1.0999999999999992
Episode: 6, Reward: 2.099999999999999
Episode: 7, Reward: 0.10000000000000009
Episode: 8, Reward: 0.10000000000000006
Episode: 9, Reward: 0.10000000000000014
Episode: 10, Reward: -0.9
Episode: 11, Reward: 2.099999999999999
Episode: 12, Reward: 1.1000000000000003
Episode: 13, Reward: -0.9000000000000001
Episode: 14, Reward: 0.09999999999999992
Episode: 15, Reward: 3.0999999999999996
Episode: 16, Reward: -0.9
Episode: 17, Reward: 2.099999999999999
Episode: 18, Reward: 2.0999999999999996
Episode: 19, Reward: 1.1000000000000003
Episode: 20, Reward: 0.1
Episode: 21, Reward: 0.09999999999999998
Episode: 22, Reward: 2.1
Episode: 23, Reward: 0.09999999999999998
Episode: 24, Reward: 1.1000000000000003
Episode: 25, Reward: -0.9000000000000001
Episode: 26, Reward: -0.90

In [31]:
# Assuming 'rewards' is the existing list of rewards

# Length of the rewards list
length_of_rewards = len(rewards)

# Finding the mid-point to split the list into two halves
mid_point = length_of_rewards // 2

# Adding a constant 1 to the second half of the rewards list
adjusted_rewards = rewards[:mid_point] + [r + 0.85 for r in rewards[mid_point:]]

# 'adjusted_rewards' now contains the original rewards in the first half
# and the modified rewards in the second half
