In [None]:
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import gym
import slimevolleygym

from dataclasses import dataclass

from google.colab import drive
drive.mount('/content/drive')

@dataclass
class HyperParams:
    BATCH_SIZE: int = 512
    GAMMA: float = 0.99
    EPS_START: float = 0.9
    EPS_END: float = 0.05
    EPS_DECAY: int = 8000  # Increased decay for more exploration
    TAU: float = 0.01
    LR: float = 5e-5
    MEMORY_SIZE: int = 80000  # Increased memory size for more diverse experiences


Transition = namedtuple("Transition", ("state", "action", "next_state", "reward"))

# set up interactive matplotlib
is_ipython = "inline" in matplotlib.get_backend()
if is_ipython:
    from IPython import display
plt.ion()

class SlimeVolleyWrapper:
    """
    Wraps SlimeVolley so it can be used with the DQN implementation.
    The original SlimeVolley has a continuous action space, but we'll discretize it.
    """

    def __init__(self, env_name="SlimeVolley-v0"):
        """
        Initialize the SlimeVolley environment.

        Args:
            env_name (str): The name of the SlimeVolley environment to use.
        """
        self.env = gym.make(env_name)

        # Define a discrete action space for DQN
        # SlimeVolley has 3 binary actions: LEFT, RIGHT, JUMP
        # This results in 2^3 = 8 possible action combinations
        self.action_space_n = 6

        # Map from discrete action index to continuous action space
        self.action_map = {
            0: [0, 0, 0],  # NOOP
            1: [1, 0, 0],  # LEFT
            2: [0, 1, 0],  # RIGHT
            3: [0, 0, 1],  # JUMP
            4: [1, 0, 1],  # LEFT + JUMP
            5: [0, 1, 1],  # RIGHT + JUMP
        }

        # Get observation space size
        self.obs_size = self.env.observation_space.shape[0]

    def reset(self):
        """
        Reset the environment and return the initial observation.

        Returns:
            np.ndarray: The initial observation.
            dict: Empty dictionary for compatibility with Gym API.
        """
        observation = self.env.reset()
        return observation, {}

    def step(self, action_index):
        """
        Take a step in the environment with the given action.

        Args:
            action_index (int): The index of the action to take.

        Returns:
            tuple: (observation, reward, done, truncated, info)
        """
        action = self.action_map[action_index]
        observation, reward, done, info = self.env.step(action)

        return observation, reward, done, False, info


class ReplayMemory:
    """
    Replay memory to store transitions.
    """

    def __init__(self, capacity: int):
        """Initialize the replay memory.

        Args:
            capacity (int): The maximum number of transitions to store.
        """
        self.memory = deque(maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        """
        Sample a batch of transitions.

        Args:
            batch_size: The number of transitions to sample.

        Returns:
            list: A list of sampled transitions.
        """
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


class DQN(nn.Module):
    def __init__(self, n_observations, n_actions):
        """
        Initializes the DQN model.

        Args:
            n_observations (int): The size of the input observation space.
            n_actions (int): The number of possible actions.
        """
        super(DQN, self).__init__()
        # Slimevolley has more complex observations, so we use a larger network
        self.layer1 = nn.Linear(n_observations, 512)
        self.ln1 = nn.LayerNorm(512)  # Replace BatchNorm with LayerNorm
        self.layer2 = nn.Linear(512, 512)
        self.ln2 = nn.LayerNorm(512)  # Replace BatchNorm with LayerNorm
        self.layer3 = nn.Linear(512, 256)
        self.ln3 = nn.LayerNorm(256)  # Replace BatchNorm with LayerNorm
        self.layer4 = nn.Linear(256, n_actions)

        # Initialize weights with Xavier initialization
        nn.init.xavier_uniform_(self.layer1.weight)
        nn.init.xavier_uniform_(self.layer2.weight)
        nn.init.xavier_uniform_(self.layer3.weight)
        nn.init.xavier_uniform_(self.layer4.weight)

    def forward(self, x):
        """
        Forward pass of the DQN model.

        Args:
            x (torch.Tensor): Input tensor representing the state.

        Returns:
            torch.Tensor: Output tensor representing Q-values for each action.
        """
        # Handle both single and batch inputs
        if x.dim() == 1:
            x = x.unsqueeze(0)

        x = F.relu(self.ln1(self.layer1(x)))
        x = F.relu(self.ln2(self.layer2(x)))
        x = F.relu(self.ln3(self.layer3(x)))
        return self.layer4(x)


class DQNTrainer:
    def __init__(
        self,
        env: SlimeVolleyWrapper,
        memory: ReplayMemory,
        device: torch.device,
        params: HyperParams,
        max_steps_per_episode: int = 1000,
        num_episodes: int = 2000,  # Increased for more training
    ) -> None:
        """
        Initializes the DQNTrainer with the required components to train a DQN agent.
        """
        self.env = env
        self.policy_net = DQN(env.obs_size, env.action_space_n).to(device)
        self.target_net = DQN(env.obs_size, env.action_space_n).to(device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.optimizer = optim.RMSprop(self.policy_net.parameters(), lr=params.LR)
        self.memory = memory
        self.device = device
        self.params = params
        self.max_steps_per_episode = max_steps_per_episode
        self.num_episodes = num_episodes

        # Track rewards per episode
        self.episode_rewards = []
        self.avg_rewards = []  # For tracking average rewards
        self.steps_done = 0

        # For evaluation
        self.evaluation_rewards = []
        self.eval_episodes = 100
        self.eval_interval = 50  # Evaluate every 50 episodes

    def select_action(self, state_tensor: torch.Tensor) -> torch.Tensor:
        """
        Selects an action using an epsilon-greedy policy based on current Q-network.
        """
        # Compute epsilon threshold
        sample = random.random()
        eps_threshold = self.params.EPS_END + (self.params.EPS_START - self.params.EPS_END) * \
        math.exp(-1.0 * self.steps_done / self.params.EPS_DECAY)

        # Update steps
        self.steps_done += 1

        # Exploit or explore
        if sample > eps_threshold:
            with torch.no_grad():
                # Choose best action from Q-network
                return self.policy_net(state_tensor).max(1).indices.view(1, 1)
        else:
            # Choose random action
            return torch.tensor(
                [[random.randrange(self.env.action_space_n)]],
                device=self.device,
                dtype=torch.long,
            )

    def optimize_model(self) -> None:
        """
        Performs one gradient descent update on the policy network using Double DQN.
        """
        if len(self.memory) < self.params.BATCH_SIZE:
            return

        transitions = self.memory.sample(self.params.BATCH_SIZE)
        batch = Transition(*zip(*transitions))

        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)

        non_final_mask = torch.tensor(
            tuple(map(lambda s: s is not None, batch.next_state)),
            device=self.device, dtype=torch.bool
        )
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

        # Get Q values for the current states and actions
        state_action_values = self.policy_net(state_batch).gather(1, action_batch)

        # Initialize next state values to zero
        next_state_values = torch.zeros(self.params.BATCH_SIZE, device=self.device)

        # Double DQN implementation
        with torch.no_grad():
            # Get actions from policy network
            next_action_indices = self.policy_net(non_final_next_states).max(1)[1].unsqueeze(1)
            # Get Q values from target network for those actions
            next_state_values[non_final_mask] = self.target_net(non_final_next_states).gather(1, next_action_indices).squeeze(1)

        # Calculate expected Q values
        expected_state_action_values = (next_state_values * self.params.GAMMA) + reward_batch

        # Compute Huber loss
        loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        # Clip gradients to avoid exploding gradients
        torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), max_norm=10)
        self.optimizer.step()

    def soft_update(self) -> None:
        """
        Performs a soft update of the target network parameters.
        """
        target_net_state_dict = self.target_net.state_dict()
        policy_net_state_dict = self.policy_net.state_dict()

        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key] * self.params.TAU + target_net_state_dict[key] * (1 - self.params.TAU)

        self.target_net.load_state_dict(target_net_state_dict)

    def evaluate(self) -> float:
        """
        Evaluates the current policy by running several episodes.

        Returns:
            float: Average reward across evaluation episodes.
        """
        total_reward = 0.0
        for _ in range(self.eval_episodes):
            obs, _ = self.env.reset()
            state = torch.tensor(obs, dtype=torch.float32, device=self.device).unsqueeze(0)
            episode_reward = 0.0
            done = False

            for _ in range(self.max_steps_per_episode):
                with torch.no_grad():
                    action = self.policy_net(state).max(1).indices.view(1, 1)
                next_obs, reward, done, _, _ = self.env.step(action.item())
                episode_reward += reward

                if done:
                    break

                state = torch.tensor(next_obs, dtype=torch.float32, device=self.device).unsqueeze(0)

            total_reward += episode_reward

        avg_reward = total_reward / self.eval_episodes
        self.evaluation_rewards.append(avg_reward)
        return avg_reward

    def plot_rewards(self, show_result: bool = False) -> None:
        """
        Plots accumulated rewards for each episode.
        """
        plt.figure(1)
        rewards_t = torch.tensor(self.episode_rewards, dtype=torch.float)

        # Calculate moving average
        if len(rewards_t) >= 100:
            means = rewards_t.unfold(0, 100, 1).mean(1).view(-1)
            means = torch.cat((torch.zeros(99), means))
            self.avg_rewards = means.numpy()

        # Decide whether to clear figure or show final result
        if show_result:
            plt.title("Result")
        else:
            plt.clf()
            plt.title("Training (Reward)")

        plt.xlabel("Episode")
        plt.ylabel("Reward")
        plt.plot(rewards_t.numpy(), alpha=0.6, label="Episode Reward")


        if len(self.evaluation_rewards) > 0:
            eval_x = np.arange(0, len(self.episode_rewards), self.eval_interval)[:len(self.evaluation_rewards)]
            plt.plot(eval_x, self.evaluation_rewards, 'r-', label="Evaluation Reward")

        plt.legend()
        plt.pause(0.001)
        if is_ipython:
            if not show_result:
                display.display(plt.gcf())
                display.clear_output(wait=True)
            else:
                display.display(plt.gcf())

    def train(self) -> None:
            """
            Runs the main training loop across the specified number of episodes.
            """
            best_eval_reward = float('-inf')
            best_model_path = "best_slimevolley_dqn_improved.pt"

            for i_episode in range(self.num_episodes):
                # Reset the environment and initialize state and episode_reward
                obs, _ = self.env.reset()
                state = torch.tensor(obs, dtype=torch.float32, device=self.device).unsqueeze(0)
                episode_reward = 0.0

                for t in range(self.max_steps_per_episode):
                    # Select an action
                    action = self.select_action(state)
                    next_obs, reward, done, _, _ = self.env.step(action.item())

                    # Convert observations to tensor
                    next_state = torch.tensor(next_obs, dtype=torch.float32, device=self.device).unsqueeze(0) if not done else None

                    # Save the transition in replay memory
                    self.memory.push(state, action, next_state, torch.tensor([reward], device=self.device))

                    # Advance state to next_state
                    state = next_state if next_state is not None else torch.tensor(obs, dtype=torch.float32, device=self.device).unsqueeze(0)

                    # Run optimization step
                    self.optimize_model()

                    # Perform soft update
                    self.soft_update()

                    # Accumulate the reward for the episode
                    episode_reward += reward

                    # Break the loop when a terminal state is reached
                    if done:
                        break

                # Tracking episode reward and plotting rewards
                self.episode_rewards.append(episode_reward)

                # Print episode info
                eps = self.params.EPS_END + (self.params.EPS_START - self.params.EPS_END) * math.exp(-1.0 * self.steps_done / self.params.EPS_DECAY)
                print(f"Episode {i_episode}: Reward = {episode_reward:.2f}, Epsilon = {eps:.4f}")

                # Evaluate periodically
                if i_episode % self.eval_interval == 0:
                    avg_eval_reward = self.evaluate()
                    print(f"Evaluation after episode {i_episode}: Average Reward = {avg_eval_reward:.2f}")

                    # Save the best model
                    if avg_eval_reward > best_eval_reward:
                        best_eval_reward = avg_eval_reward
                        torch.save({
                            'episode': i_episode,
                            'model_state_dict': self.policy_net.state_dict(),
                            'optimizer_state_dict': self.optimizer.state_dict(),
                            'reward': best_eval_reward,
                            'epsilon': eps
                        }, best_model_path)
                        print(f"New best model saved with reward {best_eval_reward:.2f}")

                # Update the rewards plot
                if i_episode % 20 == 0:
                    self.plot_rewards()

            print("Training complete")
            print(f"Best evaluation reward: {best_eval_reward:.2f}")
            self.plot_rewards(show_result=True)
            plt.ioff()
            plt.show()
            plt.savefig("rewards_plot_slimevolley_dqn_improved.png")

            ## Define the path where you want to save the model in Google Drive
            drive_model_path = "/content/drive/My Drive/final_slimevolley_dqn_improved.pt"

            # Save final model to Google Drive
            torch.save({
                'episode': self.num_episodes,
                'model_state_dict': self.policy_net.state_dict(),
                'optimizer_state_dict': self.optimizer.state_dict(),
                'reward': self.evaluation_rewards[-1] if len(self.evaluation_rewards) > 0 else -5.0,
            }, drive_model_path)

            print(f"Final model saved to {drive_model_path}")


def main():
    # Set up the environment and parameters
    env = SlimeVolleyWrapper(env_name="SlimeVolley-v0")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Create hyperparameters
    params = HyperParams(
        BATCH_SIZE=512,
        GAMMA=0.99,
        EPS_START=0.9,
        EPS_END=0.05,
        EPS_DECAY=8000,  # Increased decay for more exploration
        TAU=0.01,        # Increased TAU for faster target network updates
        LR=5e-5,         # Reduced learning rate
        MEMORY_SIZE=100000  # Increased memory size
    )

    # Create replay memory
    memory = ReplayMemory(params.MEMORY_SIZE)

    # Create trainer
    trainer = DQNTrainer(
        env=env,
        memory=memory,
        device=device,
        params=params,
        max_steps_per_episode=1000,
        num_episodes=1000  # Increased training episodes
    )

    # Start training
    trainer.train()


if __name__ == "__main__":
    main()

Episode 241: Reward = -5.00, Epsilon = 0.0500
Episode 242: Reward = -5.00, Epsilon = 0.0500
Episode 243: Reward = -5.00, Epsilon = 0.0500
Episode 244: Reward = -5.00, Epsilon = 0.0500
Episode 245: Reward = -5.00, Epsilon = 0.0500
Episode 246: Reward = -5.00, Epsilon = 0.0500
Episode 247: Reward = -5.00, Epsilon = 0.0500


In [3]:
def load_model_from_drive(model_path, device):
    """
    Loads a model from Google Drive.
    """
    # Create the DQN
    env = SlimeVolleyWrapper(env_name="SlimeVolley-v0")
    policy_net = DQN(env.obs_size, env.action_space_n).to(device)

    # Load the saved model weights
    checkpoint = torch.load(model_path, map_location=device)
    policy_net.load_state_dict(checkpoint['model_state_dict'])

    # Set to evaluation mode
    policy_net.eval()

    print(f"Model loaded from {model_path}")
    print(f"Checkpoint from episode {checkpoint['episode']} with reward {checkpoint.get('reward', 'unknown')}")

    return policy_net

In [4]:
def main():
    # For training:
    # [Your existing training code]

    # For loading a saved model:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    drive_model_path = "/content/drive/MyDrive/SlimeVolley/best_slimevolley_dqn_improved.pt"

    if os.path.exists(drive_model_path):
        print("Loading model from Google Drive...")
        policy_net = load_model_from_drive(drive_model_path, device)
        # Use the loaded model for evaluation or further training
    else:
        print("No saved model found in Google Drive, starting fresh training")
        # [Your training code]

In [5]:
def test_against_builtin_ai():
    # Setup
    env = SlimeVolleyWrapper(env_name="SlimeVolley-v0")  # Default opponent is built-in AI
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load your best model
    best_model = load_trained_agent("best_slimevolley_dqn_improved.pt", env, device)

    # Evaluate against the built-in AI
    print("Evaluating against built-in AI baseline...")
    evaluate_against_baseline(best_model, env, device, num_episodes=100)

In [6]:
def test_against_random_baseline():
    env = SlimeVolleyWrapper(env_name="SlimeVolley-v0")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Create environment with random opponent
    random_env = gym.make("SlimeVolley-v0", opponent="random")
    random_env_wrapped = SlimeVolleyWrapper(env_name=None)
    random_env_wrapped.env = random_env
    random_env_wrapped.obs_size = random_env.observation_space.shape[0]

    # Load your best model
    best_model = load_trained_agent("best_slimevolley_dqn_improved.pt", env, device)

    # Evaluate against random policy
    print("\nEvaluating against random action baseline...")
    evaluate_against_baseline(best_model, random_env_wrapped, device, num_episodes=100)

In [7]:
def compare_learning_curves():
    # Load your training data
    # (You would need to save this data during training)

    # Plot your learning curve
    plt.figure(figsize=(10, 6))
    plt.plot(your_rewards, label="Your DQN Implementation")

    # Plot baseline learning curve if available
    # plt.plot(baseline_rewards, label="Baseline DQN")

    plt.xlabel("Episode")
    plt.ylabel("Average Reward")
    plt.title("Learning Curve Comparison")
    plt.legend()
    plt.savefig("learning_curve_comparison.png")
    plt.show()

In [11]:
def visualize_gameplay(num_games=5):
    env = gym.make("SlimeVolley-v0", render_mode="human")
    wrapped_env = SlimeVolleyWrapper(env_name=None)
    wrapped_env.env = env
    wrapped_env.obs_size = env.observation_space.shape[0]

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    best_model = load_trained_agent("best_slimevolley_dqn_improved.pt", wrapped_env, device)

    for game in range(num_games):
        obs, _ = wrapped_env.reset()
        state = torch.tensor(obs, dtype=torch.float32, device=device).unsqueeze(0)
        done = False

        while not done:
            # Get action from your trained model
            with torch.no_grad():
                action = best_model(state).max(1).indices.view(1, 1)

            # Execute action
            next_obs, reward, done, _, _ = wrapped_env.step(action.item())

            # Update state
            if not done:
                state = torch.tensor(next_obs, dtype=torch.float32, device=device).unsqueeze(0)

            # Add a small delay for better visualization
            import time
            time.sleep(0.05)

In [14]:
def main_evaluation():
    # Test against built-in AI
    test_against_builtin_ai()

    # Test against random baseline
    test_against_random_baseline()

    # Compare learning curves
    # compare_learning_curves()

    # Visualize a few games
    visualize_gameplay(num_games=5)

if __name__ == "__main__":
    main_evaluation()

NameError: name 'gym' is not defined

In [15]:
!ls *slimevolley*.pt


ls: cannot access '*slimevolley*.pt': No such file or directory


In [17]:
import os
print("Files in current directory:")
!ls *

Files in current directory:
anscombe.json		     california_housing_train.csv  mnist_train_small.csv
california_housing_test.csv  mnist_test.csv		   README.md


Name: gym
Version: 0.19.0
Summary: The OpenAI Gym: A toolkit for developing and comparing your reinforcement learning agents.
Home-page: https://github.com/openai/gym
Author: OpenAI
Author-email: gym@openai.com
License: 
Location: /usr/local/lib/python3.11/dist-packages
Requires: cloudpickle, numpy
Required-by: dopamine_rl, slimevolleygym
