# Implementing Rainbow

In [5]:
import torch
from DQN_CNN import Net, train_game, eval_game, one_hot_encode_game_state, epsilon_greedy_action # Import necessary components


### First step, add Double Q Learning

In [6]:
def train_game_double_q(game, it, batch_size, gamma, optimizer, criterion, device, model, target_model):
    global losses
    global scores
    batch_outputs = []
    batch_labels = []
    step = 1

    while not game.game_over():
        state = one_hot_encode_game_state(game.state())
        state_tensor = state.unsqueeze(0).permute(0, 3, 1, 2).to(device)

        Q_values = model(state_tensor)
        Q_valid_values = [Q_values[0][a] if game.is_action_available(a) else float('-inf') for a in range(4)]
        action = epsilon_greedy_action(np.array(Q_valid_values))
        reward = game.do_action(action)

        new_state = game.state()
        new_state_tensor = one_hot_encode_game_state(new_state).unsqueeze(0).permute(0, 3, 1, 2).to(device)

        with torch.no_grad():
            Q_next = target_model(new_state_tensor)  # Use target model for stability
            Q_next_policy = model(new_state_tensor)  # Use policy model to select action

        # Double Q-Learning update rule
        next_action = torch.argmax(Q_next_policy).item()
        target_Q_value = reward + gamma * Q_next[0][next_action]

        batch_outputs.append(Q_values[0][action])
        batch_labels.append(target_Q_value)

        if step % batch_size == 0 or game.game_over():
            if len(batch_labels) == 0: return
            optimizer.zero_grad()
            label_tensor = torch.tensor(batch_labels, dtype=torch.float32).to(device)
            output_tensor = torch.stack(batch_outputs).to(device)
            batch_labels, batch_outputs = [], []
            loss = criterion(output_tensor, label_tensor)
            loss.backward()
            optimizer.step()
            losses.append(loss.item())

            if game.game_over():
                scores.append(game.score())
                if it % 100 == 0 and it > 0:
                    mean_score = sum(scores[-100:]) / 100
                    print(f"Epoch: {it}, Mean score last 100 epochs: {mean_score:.2f}")
                return
        step += 1


In [13]:
import torch
import torch.optim as optim
import torch.nn as nn
import numpy as np
import random
from DQN_CNN import Net, Game, one_hot_encode_game_state  # Make sure to import your game environment and necessary functions

# Set the parameters as specified
input_shape = (16, 4, 4)  # Shape of the game state, assuming one-hot encoded
num_actions = 4  # Number of possible actions in the game
batch_size = 128  # Batch size for training
gamma = 1  # Discount factor for future rewards
n_epoch = 1000
n_eval = 100  # Number of games to evaluate
SEED = 1  # Seed for reproducibility

# Set seeds for random number generators to ensure reproducibility
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True  # Ensure deterministic behavior in CuDNN

# Determine if CUDA is available and set the device accordingly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the neural network model
model = Net(input_shape, num_actions).to(device)

# Set the learning rate for optimization
learning_rate = 0.0001
# Initialize the optimizer with the model parameters
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# Define the loss function
criterion = nn.MSELoss().to(device)

# Initialize lists to store losses and scores during training
losses = []
scores = []

# Initialize the game environment
game = Game()  # Make sure the Game class has methods like game_over(), state(), do_action(), and score()

# Print to confirm everything is set up correctly
print("Setup complete. Device is", device)


Setup complete. Device is cpu


In [14]:
from copy import deepcopy

# Initialize models
model = Net(input_shape, num_actions).to(device)
target_model = deepcopy(model)  # Create a target model as a deep copy of the model

# Training loop
for it in range(n_epoch):
    game = Game()
    train_game_double_q(game, it, batch_size, gamma, optimizer, criterion, device, model, target_model)
    if it % 10 == 0:  # Update the target network every 10 iterations
        target_model.load_state_dict(model.state_dict())


Epoch: 100, Mean score last 100 epochs: 1094.16
Epoch: 200, Mean score last 100 epochs: 1129.08
Epoch: 300, Mean score last 100 epochs: 1129.52
Epoch: 400, Mean score last 100 epochs: 1163.40
Epoch: 500, Mean score last 100 epochs: 1076.12
Epoch: 600, Mean score last 100 epochs: 1056.32
Epoch: 700, Mean score last 100 epochs: 1017.20
Epoch: 800, Mean score last 100 epochs: 1061.60
Epoch: 900, Mean score last 100 epochs: 1080.56


In [15]:
def evaluate_model(model, n_games, device):
    total_score = 0
    model.eval()  # Set the model to evaluation mode
    
    for _ in range(n_games):
        game = Game()  # Initialize a new game
        while not game.game_over():
            state = one_hot_encode_game_state(game.state())  # Get the current state and encode it
            state_tensor = state.unsqueeze(0).permute(0, 3, 1, 2).to(device)  # Reshape and move to device

            with torch.no_grad():  # Ensure no gradients are computed during inference
                Q_values = model(state_tensor)
                Q_valid_values = [Q_values[0][a] if game.is_action_available(a) else float('-inf') for a in range(num_actions)]
                best_action = np.argmax(Q_valid_values)  # Choose the action with the highest Q-value

            game.do_action(best_action)  # Perform the action in the game

        total_score += game.score()  # Accumulate the score from the finished game

    mean_score = total_score / n_games  # Calculate the average score across all games
    print(f"Average score over {n_games} games: {mean_score}")
    return mean_score


# evaluating model after just adding Double to base

In [16]:
# Example of using the evaluate_model function
n_eval_games = 100  # Set the number of games for evaluation
mean_score = evaluate_model(model, n_eval_games, device)
print(f"mean score of model: {mean_score}")

Average score over 100 games: 1156.64
mean score of model: 1156.64


### Adding PER on top of Double

In [22]:
import numpy as np
from collections import deque
import random

class PrioritizedReplayBuffer:
    def __init__(self, capacity, alpha=0.6):
        self.alpha = alpha
        self.buffer = []
        self.priorities = np.zeros((capacity,), dtype=np.float32)
        self.position = 0
        self.capacity = capacity

    def push(self, state, action, reward, next_state, done):
        max_prio = self.priorities.max() if self.buffer else 1.0
        if len(self.buffer) < self.capacity:
            self.buffer.append((state, action, reward, next_state, done))
        else:
            self.buffer[self.position] = (state, action, reward, next_state, done)
        self.priorities[self.position] = max_prio
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size, beta=0.4):
        if len(self.buffer) == self.capacity:
            prios = self.priorities
        else:
            prios = self.priorities[:self.position]
        
        probs = prios ** self.alpha
        probs /= probs.sum()
        
        indices = np.random.choice(len(self.buffer), batch_size, p=probs)
        samples = [self.buffer[idx] for idx in indices]
        
        total = len(self.buffer)
        weights = (total * probs[indices]) ** (-beta)
        weights /= weights.max()
        weights = np.array(weights, dtype=np.float32)

        states, actions, rewards, next_states, dones = zip(*samples)
        return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones), indices, weights

    def update_priorities(self, indices, priorities):
        for idx, prio in zip(indices, priorities):
            self.priorities[idx] = prio

    def __len__(self):
        return len(self.buffer)


In [29]:
def train_game_with_double_q_per(game, it, batch_size, gamma, optimizer, criterion, device, model, target_model, buffer, beta):
    global losses
    global scores
    state = Game()
    state = one_hot_encode_game_state(state).unsqueeze(0).permute(0, 3, 1, 2).to(device)
    while not game.game_over():
        with torch.no_grad():
            Q_values = model(state)
            Q_valid_values = [Q_values[0][a] if game.is_action_available(a) else float('-inf') for a in range(4)]
            action = epsilon_greedy_action(np.array(Q_valid_values))
        
        reward = game.do_action(action)
        next_state = game.state()
        done = game.game_over()
        next_state = one_hot_encode_game_state(next_state).unsqueeze(0).permute(0, 3, 1, 2).to(device)

        buffer.push(state, action, reward, next_state, done)
        state = next_state

        if len(buffer) >= batch_size:
            states, actions, rewards, next_states, dones, indices, weights = buffer.sample(batch_size, beta)
            states = torch.tensor(states).float().to(device)
            next_states = torch.tensor(next_states).float().to(device)
            actions = torch.tensor(actions).long().to(device)
            rewards = torch.tensor(rewards).float().to(device)
            dones = torch.tensor(dones).float().to(device)
            weights = torch.tensor(weights).float().to(device)

            current_q = model(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)
            with torch.no_grad():
                # Using the target model to calculate the next Q-values for stability
                next_q_values = target_model(next_states)
                next_q_values_policy = model(next_states)
                next_actions = next_q_values_policy.max(1)[1]
                next_q = next_q_values.gather(1, next_actions.unsqueeze(-1)).squeeze(-1)
                
                expected_q = rewards + gamma * next_q * (1 - dones)

            loss = (current_q - expected_q.detach()).pow(2) * weights
            prios = loss + 1e-5
            loss = loss.mean()
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            buffer.update_priorities(indices, prios.cpu().numpy())
            losses.append(loss.item())

    if game.game_over():
        scores.append(game.score())


In [30]:
target_model = deepcopy(model)
update_target_every = 10  # Number of episodes after which to update the target network
buffer_capacity = 10000  # Choose a size that fits your game and system memory
beta_start = 0.4  # Importance sampling weight
beta_frames = 1000  # Total number of frames to reach beta=1
buffer = PrioritizedReplayBuffer(capacity=buffer_capacity)

for it in range(n_epoch):
    if it % update_target_every == 0:
        target_model.load_state_dict(model.state_dict())
    train_game_with_double_q_per(game, it, batch_size, gamma, optimizer, criterion, device, model, target_model, buffer, beta_start)


AttributeError: 'Game' object has no attribute 'shape'

In [31]:
# Example of using the evaluate_model function
n_eval_games = 100  # Set the number of games for evaluation
mean_score = evaluate_model(model, n_eval_games, device)
print(f"mean score of model: {mean_score}")

Average score over 100 games: 1113.12
mean score of model: 1113.12
