In [None]:
!wget http://www.atarimania.com/roms/Roms.rar
!unrar x -o+ /content/Roms.rar >/dev/nul
!python -m atari_py.import_roms /content/ROMS >/dev/nul
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!apt-get update > /dev/null 2>&1
!apt-get install cmake > /dev/null 2>&1
!pip install --upgrade setuptools 2>&1
!pip install ez_setup > /dev/null 2>&1
!pip install gym[atari] > /dev/null 2>&1
!pip install swig
!pip install box2d
!pip install gymnasium
!pip3 install box2d box2d-kengz

In [None]:
import gymnasium as gym
import os, sys
import argparse
from gymnasium.spaces import Discrete, Box, Tuple, MultiDiscrete, Dict
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt
import gc
import time
import scipy as sp
import pprint
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal
import torch.optim as optim
import torch.nn.init as init
import copy
import random
from collections import namedtuple, deque

In [None]:
class ReplayBuffer:
    """Fixed-size buffer to store experience tuples."""

    def __init__(self, buffer_size, batch_size, seed=12345):
        """Initialize a ReplayBuffer object.
        Params
        ======
            buffer_size (int): maximum size of buffer
            batch_size (int): size of each training batch
        """
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

        self.memory = deque(maxlen=buffer_size)  # internal memory (deque)
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
        self.seed = random.seed(seed)

    def add(self, state, action, reward, next_state, done):
        """Add a new experience to memory."""
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)

    def sample(self):
        """Randomly sample a batch of experiences from memory."""
        if len(self.memory) <= self.batch_size:
            return None

        experiences = random.sample(self.memory, k=self.batch_size)

        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(self.device)
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).float().to(self.device)
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(self.device)
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(self.device)
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device)

        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        """Return the current size of internal memory."""
        return len(self.memory)

In [None]:
class OUNoise:
    """Ornstein-Uhlenbeck process."""

    def __init__(self, size, seed=12345, mu=0., theta=0.15, sigma=0.2):
        """Initialize parameters and noise process."""
        self.size = size
        self.mu = mu * np.ones(size)
        self.theta = theta
        self.sigma = sigma
        self.seed = random.seed(seed)

        self.reset()

    def reset(self):
        """Reset the internal state (= noise) to mean (mu)."""
        self.state = copy.copy(self.mu)

    def sample(self):
        """Update internal state and return it as a noise sample."""
        x = self.state
        dx = self.theta * (self.mu - x) + self.sigma * np.random.standard_normal(self.size)
        self.state = x + dx

        return self.state

class GaussianNoise:
    """Gaussian noise."""

    def __init__(self, size, seed, mu=0, sigma=1):
        """Initialize parameters and noise process."""
        self.size = size
        self.mu = mu
        self.sigma = sigma
        self.seed = random.seed(seed)

    def reset(self):
        pass

    def sample(self):
        """Return Gaussian perturbations in the action space."""
        noise = np.random.normal(0, self.sigma, self.size)
        return noise

In [None]:
class ActorNetwork(nn.Module):
    def __init__(self, alpha, input_dims, fc1_dims, fc2_dims,
                 n_actions, weight_decay, noise, add_noise=True, name='ActorNetwork', chkpt_dir='/Checkpoints', seed=12345):
        super(ActorNetwork, self).__init__()

        self.chkpt_file = os.path.join(chkpt_dir, name)

        layers = [
            nn.Linear(input_dims, fc1_dims),
            nn.ReLU(),
            nn.Linear(fc1_dims, fc2_dims),
            nn.ReLU(),
            nn.Linear(fc2_dims, n_actions),
            nn.Tanh()
        ]

        self.fc_layers = nn.Sequential(*layers)
        self.learning_rate = alpha
        self.weight_decay = weight_decay
        self.optimizer = optim.Adam(self.parameters(), lr=alpha, weight_decay=self.weight_decay)
        self.noise = noise
        self.add_noise = add_noise

        self.action_space = None

        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

        self.init_weights()
        self.to(self.device)

    def set_action_space(self, action_space):
        self.action_space = action_space

    def init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                init.xavier_uniform_(m.weight)
                init.constant_(m.bias, 0.0)

    def forward(self, state):
        pi = self.fc_layers(state)
        pi_scaled = pi * torch.from_numpy(self.action_space.high).float()

        return pi_scaled

    def select_action(self, state, add_noise=True):
        with torch.no_grad():
            state = torch.tensor(state).to(self.device)
            state = state.unsqueeze(0)
            action = self.forward(state)
            action = action.detach().cpu().numpy()[0]

        if add_noise:
            action += self.noise.sample()

        action = action.clip(min=self.action_space.low, max=self.action_space.high)

        return action

    def backpropagation(self, state, critic_model):
        actions_pred = self.fc_layers(state)
        actor_loss = -critic_model.forward(state, actions_pred).mean()
        self.optimizer.zero_grad()
        actor_loss.backward()
        self.optimizer.step()


    def save_checkpoint(self):
        torch.save(self.state_dict(), self.chkpt_file)

    def load_checkpoint(self):
        self.load_state_dict(torch.load(self.chkpt_file))


class CriticNetwork(nn.Module):
    def __init__(self, beta, input_dims, fc1_dims, fc2_dims,
                n_actions, weight_decay, name='CriticNetwork', chkpt_dir='/Checkpoints'):
        super(CriticNetwork, self).__init__()

        self.chkpt_file = os.path.join(chkpt_dir, name)
        layers = [
            nn.Linear(input_dims + n_actions, fc1_dims),
            nn.ReLU(),
            nn.Linear(fc1_dims, fc2_dims),
            nn.ReLU(),
            nn.Linear(fc2_dims, 1)
        ]

        self.fc_layers = nn.Sequential(*layers)
        self.learning_rate = beta

        self.weight_decay = weight_decay
        self.optimizer = optim.Adam(self.parameters(), lr=beta, weight_decay=self.weight_decay)
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

        self.to(self.device)
        self.init_weights()

    def init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                init.xavier_uniform_(m.weight)
                init.constant_(m.bias, 0.0)

    def forward(self, state, action):
        sa = torch.cat([state, action], dim=1)
        q_value = self.fc_layers(sa)

        return q_value

    def backpropagation(self, qvals_target, expected_qvals):

        critic_loss = F.mse_loss(expected_qvals, qvals_target)
        self.optimizer.zero_grad()
        critic_loss.backward()

        # Clip gradients:
        for param in self.parameters():
            param.grad.data.clamp_(-10, 10)

        self.optimizer.step()

    def save_checkpoint(self):
        torch.save(self.state_dict(), self.chkpt_file)

    def load_checkpoint(self):
        self.load_state_dict(torch.load(self.chkpt_file))

In [None]:
class Agent:
    def __init__(self, actor_dims, critic_dims, n_actions, actor_output_dims,
                    alpha=0.01, beta=0.01, fc1=64,
                    fc2=64, gamma=0.95, tau=0.01,
                    add_noise=True,
                    weight_decay=0.001, memory_size=100000, memory_batch_size=64,
                    chkpt_dir='tmp/maddpg/'):
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma
        self.tau = tau
        self.n_actions = n_actions
        self.actor_output_dims = actor_output_dims
        self.add_noise = add_noise
        self.noise = OUNoise(size=self.actor_output_dims)
        self.weight_decay = weight_decay
        self.memory_size = memory_size
        self.memory_batch_size = memory_batch_size
        self.experience_replay = ReplayBuffer(buffer_size=memory_size, batch_size=self.memory_batch_size)
        self.agent_name = 'LunarLander'
        self.actor = ActorNetwork(alpha=self.alpha, input_dims=actor_dims,
                                  fc1_dims=fc1, fc2_dims=fc2,
                                  n_actions=self.n_actions, weight_decay=self.weight_decay,
                                  noise=self.noise, add_noise=self.add_noise,
                                  chkpt_dir=chkpt_dir,
                                  name=self.agent_name+'_actor.pth')
        self.critic = CriticNetwork(beta=self.beta, input_dims=critic_dims, fc1_dims=fc1, fc2_dims=fc2,
                                    n_actions=self.n_actions, weight_decay=self.weight_decay,
                                    chkpt_dir=chkpt_dir,
                                    name=self.agent_name+'_critic.pth')
        self.target_actor = ActorNetwork(alpha=self.alpha, input_dims=actor_dims,
                                        fc1_dims=fc1, fc2_dims=fc2,
                                        n_actions=self.n_actions, weight_decay=self.weight_decay,
                                        noise=self.noise, add_noise=self.add_noise,
                                        chkpt_dir=chkpt_dir,
                                        name=self.agent_name+'_target_actor.pth')
        self.target_critic = CriticNetwork(beta=self.beta, input_dims=critic_dims, fc1_dims=fc1, fc2_dims=fc2,
                                            n_actions=self.n_actions, weight_decay=self.weight_decay,
                                            chkpt_dir=chkpt_dir,
                                            name=self.agent_name+'_target_critic.pth')

        self.update_network_parameters(tau=self.tau)


    def update_network_parameters(self, tau=None):
        if tau is None:
            tau = self.tau

        for param, target_param in zip(self.critic.parameters(), self.target_critic.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

        for param, target_param in zip(self.actor.parameters(), self.target_actor.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

        # target_actor_params = self.target_actor.named_parameters()
        # actor_params = self.actor.named_parameters()

        # target_actor_state_dict = dict(target_actor_params)
        # actor_state_dict = dict(actor_params)
        # for name in actor_state_dict:
        #     actor_state_dict[name] = tau*actor_state_dict[name].clone() + \
        #             (1-tau)*target_actor_state_dict[name].clone()

        # self.target_actor.load_state_dict(actor_state_dict)

        # target_critic_params = self.target_critic.named_parameters()
        # critic_params = self.critic.named_parameters()

        # target_critic_state_dict = dict(target_critic_params)
        # critic_state_dict = dict(critic_params)
        # for name in critic_state_dict:
        #     critic_state_dict[name] = tau*critic_state_dict[name].clone() + \
        #             (1-tau)*target_critic_state_dict[name].clone()

        # self.target_critic.load_state_dict(critic_state_dict)

    def sample_to_memory(self, state, action, reward, next_state, done):
        """Save experience in replay memory, and use random sample from buffer to learn."""
        # Save experience / reward
        self.experience_replay.add(state, action, reward, next_state, done)

    def sample_from_memory(self):
        # Learn, if enough samples are available in memory
        minibatch_experiences = self.experience_replay.sample()
        if not minibatch_experiences:
            return

        return minibatch_experiences

    def choose_action(self, state, add_noise=True):
        action = self.actor.select_action(state, add_noise)
        return action

    def step(self):

        minibatch_experiences = self.sample_from_memory()
        states, actions, rewards, next_states, dones = minibatch_experiences
        next_actions = self.target_actor.forward(next_states)
        Q_targets_next = self.target_critic.forward(next_states, next_actions)
        Q_targets = rewards + self.gamma * Q_targets_next * (1 - dones)
        Q_expected = self.critic.forward(states, actions)
        self.critic.backpropagation(Q_targets, Q_expected)

        pred_actions = self.actor.forward(states)
        actor_loss = - self.critic(states, pred_actions).mean()
        # Minimize the loss
        self.actor.optimizer.zero_grad()
        actor_loss.backward()
        self.actor.optimizer.step()

        self.update_network_parameters(tau=self.tau)

    def save_models(self):
        self.actor.save_checkpoint()
        self.target_actor.save_checkpoint()
        self.critic.save_checkpoint()
        self.target_critic.save_checkpoint()

    def load_models(self):
        self.actor.load_checkpoint()
        self.target_actor.load_checkpoint()
        self.critic.load_checkpoint()
        self.target_critic.load_checkpoint()


# Lunar Lander

In [None]:
env = gym.make(
    "LunarLander-v2",
    continuous=True,
    gravity=-9.81,
    enable_wind=False,
    wind_power=5.0,
    turbulence_power=1.5,
)

In [None]:
action_space = env.action_space
observation_space = env.observation_space

print("Action Space:", action_space.shape[0])
print("Observation Space:", observation_space.shape[0])

Action Space: 2
Observation Space: 8


In [None]:
class LunarLander:
    def __init__(self,
                 n_episodes=1000, memory_size=1e6, memory_batch_size=64,
                 alpha=0.01, beta=0.01, fc1=64,
                 fc2=64, gamma=0.99, tau=0.01, chkpt_dir='tmp/maddpg/'):

        self.env = env = gym.make(
                            "LunarLander-v2",
                            continuous=True,
                            gravity=-9.81,
                            enable_wind=False,
                            wind_power=5.0,
                            turbulence_power=1.5,
                        )

        self.n_episodes = n_episodes

        self.n_actions = self.env.action_space.shape[0]
        self.actor_output_dims = self.env.action_space.shape
        self.actor_dims = self.env.observation_space.shape[0]
        self.crtic_dims = self.env.observation_space.shape[0]

        self.alpha = alpha
        self.beta = beta
        self.fc1 = fc1
        self.fc2 = fc2
        self.gamma = gamma
        self.tau = tau
        self.memory_size = memory_size
        self.batch_size = memory_batch_size
        self.chkpt_dir = chkpt_dir

        self.agent = Agent(actor_dims=self.actor_dims,
                           critic_dims=self.crtic_dims,
                           n_actions=self.n_actions,
                           actor_output_dims=self.actor_output_dims,
                           alpha=self.alpha,
                           beta=self.beta,
                           fc1=self.fc1, fc2=self.fc2, gamma=self.gamma, tau=self.tau,
                           memory_size=self.memory_size, memory_batch_size=self.batch_size,
                           chkpt_dir=self.chkpt_dir)

        self.training_steps = 0

        self.agent.target_actor.set_action_space(self.env.action_space)
        self.agent.actor.set_action_space(self.env.action_space)

        self.score_record = []

    def env_reset(self):
        state = self.env.reset()
        if isinstance(state, tuple):
            state = state[0]  # Index the first element of the tuple
        state = torch.tensor(state, dtype=torch.float32)
        return state


    def save_checkpoint(self):
        print('... saving checkpoint ...')
        self.agent.save_models()

    def load_checkpoint(self):
        print('... loading checkpoint ...')
        self.agent.load_models()


    def train(self, load=False):
        if load:
            self.load_checkpoint()

        score_history = []
        for episode in range(self.n_episodes):

            state = self.env_reset()

            episode_score = 0
            done = False

            while not done:

                if self.training_steps <= (1/10) * self.memory_size or len(self.agent.experience_replay) <= self.batch_size:
                    action = self.env.action_space.sample()
                    next_state, reward, done, _, _ = self.env.step(action_space.sample())
                    self.agent.sample_to_memory(state, action, reward, next_state, done)
                else:
                    action = self.agent.choose_action(state, add_noise=True)
                    next_state, reward, done, _, _ = self.env.step(action)
                    self.agent.sample_to_memory(state, action, reward, next_state, done)
                    state = next_state
                    self.agent.step()

                self.training_steps += 1
                episode_score += reward

            score_history.append(episode_score)
            self.score_record.append(np.mean(score_history))

            if episode % 10 == 0 and episode>20:
                print('Episode: ', episode, 'Score: ', np.mean(score_history[-10:-1]))
                self.save_checkpoint()
                plt.clf()  # Clear the previous plot
                plt.figure(figsize=(15,10))
                plt.plot(score_history)
                plt.xlabel('Episode')
                plt.ylabel('Score')
                plt.title('Score History')
                mva = np.convolve(score_history, np.ones(10)/10, mode='valid')
                x = np.arange(len(mva))
                plt.plot(x, mva, label='Moving Average')

                plt.legend()  # Show the legend with labels
                plt.pause(0.001)  # Add a small pause to allow the plot to be displayed

    def test(self):

        self.load_checkpoint()

        state = self.env_reset()

        episode_score = 0
        done = False

        while not done:
            action = self.agent.choose_action(state, add_noise=False)
            next_state, reward, done, _, _ = self.env.step(action)
            state = next_state
            episode_score += reward

        print(f'Episode Score: {episode_score}')



In [None]:
training_env = LunarLander(n_episodes=300,
                           memory_size=100000,
                           memory_batch_size=256,
                           alpha=1e-4, beta=1e-3,
                           fc1=256,
                           fc2=128,
                           gamma=0.995,
                           tau=1e-3,
                           chkpt_dir='/content')

In [None]:
training_env.train(load=True)

In [None]:
training_env.test()