# Opponent Modeling with LOLA for Adversarial Environments

Elias Mann, Max Hasenauer

In [4]:
#imports
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import tqdm
from pettingzoo.classic import tictactoe_v3, rps_v2, connect_four_v3
from collections import namedtuple
import torch.optim as optim
import copy
import math
import os

## Environments

We are using simple adversarial environments.

In [227]:
envs = {
    'rps': rps_v2.env(render_mode='rgb_array'),
    'tictactoe': tictactoe_v3.env(render_mode='rgb_array'),
    'connect_four': connect_four_v3.env(render_mode='rgb_array')
}

In [167]:
def render(env, policy=None, seed=42):

    env.reset(seed=seed)



    for agent in env.agent_iter():
        obs, reward, done, info, _ = env.last()
        if done:
            action = None
        else:
            mask = obs['action_mask']
            if policy is None:
                action = env.action_spaces[agent].sample(mask)
            else:
                action = policy(mask, env.action_spaces[agent])

        env.step(action)

    env.close()
    



In [120]:
render(envs['tictactoe'])

## Models

### Starting with the DQN Model

In [7]:
class ExponentialSchedule:
    def __init__(self, value_from, value_to, num_steps):
        """Exponential schedule from `value_from` to `value_to` in `num_steps` steps.

        $value(t) = a \exp (b t)$

        :param value_from: initial value
        :param value_to: final value
        :param num_steps: number of steps for the exponential schedule
        """
        self.value_from = value_from
        self.value_to = value_to
        self.num_steps = num_steps

        # YOUR CODE HERE:  determine the `a` and `b` parameters such that the schedule is correct
        self.a = value_to / value_from
        self.b = math.log(self.a) / (num_steps - 1)

    def value(self, step) -> float:
        """Return exponentially interpolated value between `value_from` and `value_to`interpolated value between.

        returns {
            `value_from`, if step == 0 or less
            `value_to`, if step == num_steps - 1 or more
            the exponential interpolation between `value_from` and `value_to`, if 0 <= steps < num_steps
        }

        :param step:  The step at which to compute the interpolation.
        :rtype: float.  The interpolated value.
        """

        # YOUR CODE HERE:  implement the schedule rule as described in the docstring,
        # using attributes `self.a` and `self.b`.
        if step <= 0:
            value = self.value_from
        elif step >= self.num_steps - 1:
            value = self.value_to
        else:
            value = self.value_from * math.exp(self.b * step)

        return value


In [229]:
# Batch namedtuple, i.e. a class which contains the given attributes
Batch = namedtuple(
    'Batch', ('states', 'actions', 'rewards', 'next_states', 'dones')
)


class ReplayMemory:
    def __init__(self, max_size, state_size):
        """Replay memory implemented as a circular buffer.

        Experiences will be removed in a FIFO manner after reaching maximum
        buffer size.

        Args:
            - max_size: Maximum size of the buffer.
            - state_size: Size of the state-space features for the environment.
        """
        self.max_size = max_size
        self.state_size = state_size

        # preallocating all the required memory, for speed concerns
        self.states = torch.empty((max_size, state_size))
        self.actions = torch.empty((max_size, 1), dtype=torch.long)
        self.rewards = torch.empty((max_size, 1))
        self.next_states = torch.empty((max_size, state_size))
        self.dones = torch.empty((max_size, 1), dtype=torch.bool)

        # pointer to the current location in the circular buffer
        self.idx = 0
        # indicates number of transitions currently stored in the buffer
        self.size = 0

    def add(self, state, action, reward, next_state, done):
        """Add a transition to the buffer.

        :param state:  1-D np.ndarray of state-features.
        :param action:  integer action.
        :param reward:  float reward.
        :param next_state:  1-D np.ndarray of state-features.
        :param done:  boolean value indicating the end of an episode.
        """

        # YOUR CODE HERE:  store the input values into the appropriate
        # attributes, using the current buffer position `self.idx`
        self.states[self.idx] = torch.tensor(state.flatten(), dtype=torch.float)
        self.actions[self.idx] = torch.tensor(action, dtype=torch.long)
        self.rewards[self.idx] = torch.tensor(reward, dtype=torch.float)
        self.next_states[self.idx] = torch.tensor(next_state.flatten(), dtype=torch.float)
        self.dones[self.idx] = torch.tensor(done, dtype=torch.bool)
        
        # DO NOT EDIT
        # circulate the pointer to the next position
        self.idx = (self.idx + 1) % self.max_size
        # update the current buffer size
        self.size = min(self.size + 1, self.max_size)

    def sample(self, batch_size) -> Batch:
        """Sample a batch of experiences.

        If the buffer contains less that `batch_size` transitions, sample all
        of them.

        :param batch_size:  Number of transitions to sample.
        :rtype: Batch
        """

        # YOUR CODE HERE:  randomly sample an appropriate number of
        # transitions *without replacement*.  If the buffer contains less than
        # `batch_size` transitions, return all of them.  The return type must
        # be a `Batch`.

        sample_indices = torch.randint(0, self.size, size=(batch_size,))
        batch = Batch(
            states=self.states[sample_indices],
            actions=self.actions[sample_indices],
            rewards=self.rewards[sample_indices],
            next_states=self.next_states[sample_indices],
            dones=self.dones[sample_indices]
        )

        return batch

    def populate(self, env, num_steps):
        """Populate this replay memory with `num_steps` from the random policy.

        :param env:  Openai Gym environment
        :param num_steps:  Number of steps to populate the
        """

        # YOUR CODE HERE:  run a random policy for `num_steps` time-steps and
        # populate the replay memory with the resulting transitions.
        # Hint:  don't repeat code!  Use the self.add() method!
        #randomly choose 1 or -1 to decide which agent to play
        curr_player = np.random.choice([env.possible_agents[0], env.possible_agents[1]])
            
        env.reset()
        step_count = 0
        for agent in env.agent_iter():
            obs, reward, done, _, _ = env.last()
            if done:
                env.reset()
                curr_player = np.random.choice([env.possible_agents[0], env.possible_agents[1]])
            else:
                mask = obs['action_mask']
                action = env.action_spaces[agent].sample(mask)
                env.step(action)
                next_state, reward, _, _,_ = env.last()
                reward = reward if agent == curr_player else -reward
                self.add(obs['observation'], action, reward, next_state['observation'], done)            
                
            step_count += 1
            if step_count >= num_steps:
                break

In [280]:
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim, *, num_layers=3, hidden_dim=128):

        super().__init__()
        #check if observation is a tuple
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim

        layers = []
        layers.append(nn.Linear(self.state_dim, hidden_dim))
        layers.append(nn.ReLU())
        for _ in range(num_layers - 2):  # Subtract 2 for the input and output layers
            layers.append(nn.Linear(hidden_dim, hidden_dim))
            layers.append(nn.ReLU())
        layers.append(nn.Linear(hidden_dim, action_dim))
        self.model = nn.Sequential(*layers)

    def forward(self, states) -> torch.Tensor:

        #assert that this is not an empty tensor
        assert len(states.shape) > 0, f"Input states tensor has invalid shape {states}"
        assert states.shape[-1] == self.state_dim, f"Input states shape {states.shape} does not match expected state_dim {self.state_dim}"
        action_values = self.model(states)
        return action_values
    
    # utility methods for cloning and storing models.  DO NOT EDIT
    @classmethod
    def custom_load(cls, data):
        model = cls(*data['args'], **data['kwargs'])
        model.load_state_dict(data['state_dict'])
        return model

    def custom_dump(self):
        return {
            'args': (self.state_dim, self.action_dim),
            'kwargs': {
                'num_layers': self.num_layers,
                'hidden_dim': self.hidden_dim,
            },
            'state_dict': self.state_dict(),
        }

In [281]:
def train_dqn_batch(optimizer, batch, dqn_model, dqn_target, gamma) -> float:
    """Perform a single batch-update step on the given DQN model.

    :param optimizer: nn.optim.Optimizer instance.
    :param batch:  Batch of experiences (class defined earlier).
    :param dqn_model:  The DQN model to be trained.
    :param dqn_target:  The target DQN model, ~NOT~ to be trained.
    :param gamma:  The discount factor.
    :rtype: float  The scalar loss associated with this batch.
    """
    # YOUR CODE HERE:  compute the values and target_values tensors using the
    # given models and the batch of data.
    values = dqn_model(batch.states).gather(1, batch.actions)

    with torch.no_grad():
        # Select the maximum Q-value for each next state
        max_next_values = dqn_target(batch.next_states).max(dim=1, keepdim=True)[0]
        # Compute the TD target: reward + gamma * max_next_value * (1 - done)
        target_values = batch.rewards + gamma * max_next_values *  ~batch.dones


    # DO NOT EDIT FURTHER

    assert (
        values.shape == target_values.shape
    ), 'Shapes of values tensor and target_values tensor do not match.'

    # testing that the value tensor requires a gradient,
    # and the target_values tensor does not
    assert values.requires_grad, 'values tensor should not require gradients'
    assert (
        not target_values.requires_grad
    ), 'target_values tensor should require gradients'

    # computing the scalar MSE loss between computed values and the TD-target
    loss = F.mse_loss(values, target_values)

    optimizer.zero_grad()  # reset all previous gradients
    loss.backward()  # compute new gradients
    optimizer.step()  # perform one gradient descent step

    return loss.item()

In [311]:
def train_dqn_adversarial(
    env,
    num_steps,
    *,
    num_saves=5,
    replay_size,
    replay_prepopulate_steps=0,
    batch_size,
    exploration,
    gamma,
):
    """
    DQN algorithm.

    Compared to previous training procedures, we will train for a given number
    of time-steps rather than a given number of episodes.  The number of
    time-steps will be in the range of millions, which still results in many
    episodes being executed.

    Args:
        - env: The openai Gym environment
        - num_steps: Total number of steps to be used for training
        - num_saves: How many models to save to analyze the training progress.
        - replay_size: Maximum size of the ReplayMemory
        - replay_prepopulate_steps: Number of steps with which to prepopulate
                                    the memory
        - batch_size: Number of experiences in a batch
        - exploration: a ExponentialSchedule
        - gamma: The discount factor

    Returns: (saved_models, returns)
        - saved_models: Dictionary whose values are trained DQN models
        - returns: Numpy array containing the return of each training episode
        - lengths: Numpy array containing the length of each training episode
        - losses: Numpy array containing the loss of each training batch
    """
    # initialize the DQN and DQN-target models
    dqn_models = {}
    dqn_targets = {}
    # initialize the optimizer
    optimizers = {}
    # initialize the replay memory and prepopulate it
    memories  = {}
    # initiate lists to store returns, lengths and losses
    returns = {}
    lengths = {}
    losses = {}

    saved_models = {}



    for agent in ['agent_0', 'agent_1']:
        # check that the observation space is a Box
        observation_space = np.prod(env.observation_spaces[env.possible_agents[0]]['observation'].shape)
        action_space = env.action_space(env.possible_agents[0])
        dqn_models[agent] = DQN(observation_space, action_space.n)
        dqn_targets[agent] = DQN.custom_load(dqn_models[agent].custom_dump())
        optimizers[agent] = optim.Adam(dqn_models[agent].parameters())
        memories[agent] = ReplayMemory(replay_size, observation_space)
        memories[agent].populate(env, replay_prepopulate_steps)
        returns[agent] = []
        lengths[agent] = []
        losses[agent] = []
        saved_models[agent] = {}

    # initiate structures to store the models at different stages of training
    t_saves = np.linspace(0, num_steps, num_saves - 1, endpoint=False)
    i_episode = 0  # use this to indicate the index of the current episode
    t_episode = 0  # use this to indicate the time-step inside current episode

    # iterate for a total of `num_steps` steps
    pbar = tqdm.trange(num_steps, ncols=100)
    for t_total in pbar:
        # use t_total to indicate the time-step from the beginning of training

        # save models
        if t_total in t_saves:
            model_name_0 = f'{100 * t_total / num_steps:04.1f}'.replace('.', '_')
            saved_models['agent_0'][model_name_0] = copy.deepcopy(dqn_models['agent_0'])

            model_name_1 = f'{100 * t_total / num_steps:04.1f}'.replace('.', '_')
            saved_models['agent_1'][model_name_1] = copy.deepcopy(dqn_models['agent_0'])

        env.reset()
        #randomly choose which agent is which player
        players = ['agent_0', 'agent_1']
        np.random.shuffle(players) 
        player_0, player_1 = players
        # Sample an action from the DQN using epsilon-greedy
        for agent in env.agent_iter():

            #assign the current agent to the player
            curr_agent = player_0 if agent == 'player_0' else player_1 

            # Get the current state
            obs, reward, done, _, _ = env.last()

            if done:
                #add the last transition to the replay memory
                memories[curr_agent].add(obs['observation'], action, reward, next_state['observation'], done)

                for final_agent in ['agent_0', 'agent_1']:
                    # Compute return G
                    G = 0
                    for i in range(t_episode, -1, -1):
                        G = gamma * G + memories[final_agent].rewards[i]
                        
                    # Store stuff
                    returns[final_agent].append(G)
                    lengths[final_agent].append(t_episode + 1)
        
                # Reset variables, indices, lists, etc.
                i_episode += 1
                t_episode = 0

                # Update the progress bar
                pbar.set_description(
                    f'Episode: {i_episode} | Steps: {t_episode + 1} | Return: {G.item():5.2f} | Epsilon: {eps:4.2f}'
                )

                break

            else:
                mask = obs['action_mask']
                eps = exploration.value(t_total)
                if np.random.rand() < eps:
                    action = env.action_spaces[agent].sample(mask)
                else:
                    action_probs = dqn_models[curr_agent](torch.tensor(obs['observation'].flatten(), dtype=torch.float))
                    #set actionprobs at index of invalid actions to -inf
                    action_probs[mask == 0] = -float('inf')   
                    action = torch.argmax(action_probs).item()
                # Use the action to advance the environment by one step
                env.step(action)
                # Get the next state
                next_state, reward, done, _, _ = env.last()
                # Store the transition into the replay memory
                memories[curr_agent].add(obs['observation'], action, reward, next_state['observation'], done)
                # Update the progress bar
                t_episode += 1

            
            # YOUR CODE HERE:  once every 4 steps,
            #  * sample a batch from the replay memory
            #  * perform a batch update (use the train_dqn_batch() method!)
            
            if t_total % 2 == 0:
                batch = memories[curr_agent].sample(batch_size)
                loss = train_dqn_batch(optimizers[curr_agent], batch, dqn_models[curr_agent], dqn_targets[curr_agent], gamma)
                losses[curr_agent].append(loss)

            # YOUR CODE HERE:  once every 10_000 steps,
            #  * update the target network (use the dqn_model.state_dict() and
            #    dqn_target.load_state_dict() methods!)
            
            # Update the target network
            if t_total % 1_000 == 0:
                dqn_targets[curr_agent].load_state_dict(dqn_models[curr_agent].state_dict())
        

    for agent in ['agent_0', 'agent_1']:
        saved_models[agent]['100_0'] = copy.deepcopy(dqn_models[agent])


    return (
        saved_models,
        returns,
        lengths,
        losses,
    )

In [312]:
env = envs['tictactoe']
env.render = False
gamma = 0.99

# we train for many time-steps;  as usual, you can decrease this during development / debugging.
# but make sure to restore it to 1_500_000 before submitting.
num_steps = 150_000
num_saves = 5  # save models at 0%, 25%, 50%, 75% and 100% of training

replay_size = 20_000
replay_prepopulate_steps = 5_000

batch_size = 32
exploration = ExponentialSchedule(1.0, 0.01, 100_000)

# this should take about 90-120 minutes on a generic 4-core laptop
dqn_models, returns, lengths, losses = train_dqn_adversarial(
    env,
    num_steps,
    num_saves=num_saves,
    replay_size=replay_size,
    replay_prepopulate_steps=replay_prepopulate_steps,
    batch_size=batch_size,
    exploration=exploration,
    gamma=gamma,
)

assert len(dqn_models) == num_saves
assert all(isinstance(value, DQN) for value in dqn_models.values())

# saving computed models to disk, so that we can load and visualize them later.
checkpoint = {key: dqn.custom_dump() for key, dqn in dqn_models.items()}
torch.save(checkpoint, f'checkpoint_{env.spec.id}.pt')

#save the returns, lengths and losses to disk
os.makedirs(env.spec.id, exist_ok=False)
np.save(f'{env.spec.id}/returns.npy', returns)
np.save(f'{env.spec.id}/lengths.npy', lengths)
np.save(f'{env.spec.id}/losses.npy', losses)

torch.Size([20000, 18])
torch.Size([20000, 18])


Episode: 8622 | Steps: 1 | Return: -1.95 | Epsilon: 0.67:   6%| | 8622/150000 [02:00<32:54, 71.60it/


KeyboardInterrupt: 

### Creating the Opponent Model

In [None]:
class DQN_OM(nn.Module):
    def __init__(self, state_dim, action_dim, * num_layers, hidden_dim):
        super().__init__()

        self.state_dim = state_dim
        self.action_dim = action_dim
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim

        #define the put dimension to take in the state and predicted oppponent action values
        input_dim = state_dim + action_dim

        #define the layers of the model
        layers = []

        #input layerNo
        layers.append(nn.Linear(input_dim, hidden_dim))
        layers.append(nn.ReLU())

        #hidden layers
        for _ in range(num_layers - 2):
            layers.append(nn.Linear(hidden_dim, hidden_dim))
            layers.append(nn.ReLU())

        #output layer
        layers.append(nn.Linear(hidden_dim, action_dim))
        layers.append(nn.Softmax(dim=-1))

        #create a sequential model
        self.model = nn.Sequential(*layers)

    def forward(self, state):
        action_values = self.model(state)
        return action_values

    # utility methods for cloning and storing models.  DO NOT EDIT
    @classmethod
    def custom_load(cls, data):
        model = cls(*data['args'], **data['kwargs'])
        model.load_state_dict(data['state_dict'])
        return model

    def custom_dump(self):
        return {
            'args': (self.state_dim, self.action_dim),
            'kwargs': {
                'num_layers': self.num_layers,
                'hidden_dim': self.hidden_dim,
            },
            'state_dict': self.state_dict(),
        }

In [None]:
# Define the DQN_OM training procedure
def train_dqn_om(env, num_players, num_inference_steps, num_training_steps, state_dim, action_dim, hidden_dim):
    # Create DQN_OM agents for each player
    agents = [DQN_OM(state_dim, action_dim, num_layers=3, hidden_dim=hidden_dim), DQN_OM(state_dim, action_dim, num_layers=3, hidden_dim=hidden_dim)]
    num_agents = len(agents)

    # Create optimizers for each agent
    optimizers = [optim.Adam(agent.parameters(), lr=0.001) for agent in agents]

    # Placeholder for opponent action values (z̃other)
    opponent_action_values = [torch.zeros((num_players, action_dim)) for _ in range(num_agents)]

    for t in range(num_training_steps):
        for k in range(num_players):
            agent = agents[k]
            optimizer = optimizers[k]

            # Reset the game environment
            state = env.reset()[0]

            for step in range(episode_length):
                acting_agent = k
                non_acting_agent = (k + 1) % num_players  # Assuming a circular opponent order

                s_self = game.get_state(acting_agent)
                s_other = game.get_state(non_acting_agent)
                z_other = opponent_action_values[acting_agent]

                z_other = torch.nn.functional.gumbel_softmax(z_other, tau=1.0, hard=False)

                # Predict the action values for the acting agent
                combined_state = torch.cat((s_self, z_other), dim=-1)
                pi_self, _ = agent(combined_state)

                # Sample an action for the acting agent
                a_self = torch.multinomial(pi_self, 1).squeeze().item()

                game.take_action(a_self, acting_agent)

                # Perform inference steps for the non-acting agent
                for _ in range(num_inference_steps):
                    z_other = torch.nn.functional.gumbel_softmax(z_other, tau=1.0, hard=False)
                    pi_other, _ = agents[non_acting_agent](s_other, z_other, s_self)

                    # Calculate loss and backpropagate
                    loss = torch.nn.functional.cross_entropy(pi_other, a_self)
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()

                opponent_action_values[acting_agent] = z_other

            # Update opponent action values for the next round
            opponent_action_values[acting_agent] = z_other

        # Update policies for all players
        for k in range(num_players):
            optimizer = optimizers[k]
            optimizer.step()
