<a href="https://colab.research.google.com/github/brandinho/ai-arena-starter/blob/main/AI_Arena_Starter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

To start off, we will install the `aiarena-gym` package

In [None]:
!pip install aiarena-gym

Now we will import all of the packages that we'll need to run a sample model in the AI Arena gym. 
<br><br>
We will build our sample model with PyTorch.

In [None]:
from aiarena_gym.environment import Game
from aiarena_gym.benchmarks.rules_based_agents import opponents
from aiarena_gym.helpers.exporting_ops import save_pytorch_model
from aiarena_gym.helpers.game_functions import get_state

import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F


Below we will create a simple function to randomly sample minibatches from our dataset

In [None]:
def get_shuffled_index(n_observations):
    return np.random.choice(n_observations, size = n_observations, replace = False)
    

def sample_minibatches(data_tuple, instance_axis_tuple, batch_size):
    assert all(
        [data_tuple[x].shape[ax] == data_tuple[0].shape[instance_axis_tuple[0]] 
         for x, ax in zip(range(1, len(data_tuple)), instance_axis_tuple[1:])]
    )

    # This assumes that the first entry in the tuple is always the inputs
    n_observations = data_tuple[0].shape[0]
    n_batches = n_observations // batch_size + 1
    
    shuffled_index = get_shuffled_index(n_observations)

    batch_num = 0
    for _ in range(n_batches):
        current_index = shuffled_index[batch_num:batch_num+batch_size]
        yield (x[current_index] if ax == 0 else x[:,current_index] for x, ax in zip(data_tuple, instance_axis_tuple))
        batch_num += batch_size

We will define a simple policy gradient, which trains via the REINFORCE algorithm. The training function uses the minibatch sampling that we defined above.

In [None]:
class PolicyGradient(torch.nn.Module):
    def __init__(self, n_features, n_actions, neurons, activation_function, learning_rate):
        super(PolicyGradient, self).__init__()
        self.n_features = n_features
        self.n_actions = n_actions
        self.neurons = neurons
        self.activation_function = activation_function
        self.learning_rate = learning_rate
                
        self.output_activation = F.softmax
        
        self.n_layers = len(self.neurons) + 1
        self.layers = torch.nn.ModuleList()
        for l in range(self.n_layers):
            if l == 0:
                in_dim = n_features
                out_dim = neurons[l]
            elif l == self.n_layers - 1:
                in_dim = neurons[l-1]
                out_dim = n_actions
            else:
                in_dim = neurons[l-1]
                out_dim = neurons[l]                
            self.layers.append(nn.Linear(in_dim, out_dim))
            
        self.optimizer = torch.optim.Adam(self.parameters(), lr = self.learning_rate)
    
    def policy(self, state):
        current_layer = state
        for l in range(self.n_layers):
            if l < self.n_layers - 1:
                current_layer = self.activation_function(self.layers[l](current_layer))
            else:
                current_layer = self.output_activation(self.layers[l](current_layer), dim = 1)
        return current_layer
        
    @staticmethod
    def tensor_to_array(torch_tensor):
        return torch_tensor.detach().cpu().numpy()
        
    def select_action(self, state):
        state = torch.tensor(state).float()
        policy = self.tensor_to_array(self.policy(state))
        action = np.random.choice(
            np.arange(self.n_actions), 
            1, 
            p = policy.reshape(-1)
        )[0]
        return action

    def get_loss(self, states, actions, rewards):
        states = torch.tensor(states).float()
        actions = torch.tensor(actions).type(torch.LongTensor)
        rewards = torch.tensor(rewards).float()
        
        policy = self.policy(states)
        actions_one_hot = F.one_hot(actions, num_classes = self.n_actions)
        action_probabilities = torch.sum(policy * actions_one_hot, dim = 1)
        return -torch.mean(rewards * torch.log(action_probabilities + 0.001))

    def train(self, states, actions, rewards, epochs, batch_size, verbose = False):
        for i in range(epochs):
            instance_axis_tuple = (0,0,0)
            data_tuple = (states, actions, rewards)
            for minibatch_s, minibatch_a, minibatch_r in sample_minibatches(
                data_tuple, 
                instance_axis_tuple, 
                batch_size
            ):
                if len(minibatch_s) == 0:
                    continue
    
                self.optimizer.zero_grad()
                loss = self.get_loss(minibatch_s, minibatch_a, minibatch_r)
                loss.backward()
                self.optimizer.step()
                
            if verbose and (i+1) % 5 == 0:
                print("Epoch {}".format(i+1))

We will define a simple mapping for activation functions to allow users to select an activation function through the dropdown in the following section.

In [None]:
activation_function_mapping = {
    "relu": F.relu,
    "elu": F.elu,
    "tanh": F.tanh
}

Initialize your model. Note that AI Arena has done feature engineering for this competition, so researchers are unable to change the input dimensionality for the neural network. To learn more about the state space, please check out our [researcher wiki](https://www.notion.so/AI-Arena-State-Space-88ac2ff5e2f14f67a8dbfc703592be50). Additionally, there is a preset number of actions.

In [None]:
#@title Model Hyperparameters { form-width: "400px" }
n_features = 9 # DO NOT CHANGE
n_actions = 10 # DO NOT CHANGE
neurons = [36, 24, 12]
activation_function = "relu" #@param ["relu", "elu", "tanh"]
learning_rate = 0.0018 #@param {type:"slider", min:0, max:0.01, step:0.0001}
activation = activation_function_mapping[activation_function]
model = PolicyGradient(
    n_features, 
    n_actions,
    neurons, 
    activation,
    learning_rate
)


Below, you have two opponents to choose from. In the researcher platform, sidai corresponds to benchmark 1, and sihing corresponds to benchmark 2. However, be careful not to overfit to these benchmarks because it might not work too well as you try and battle the other models on the leaderboard!

In [None]:
#@title Select Opponent { form-width: "400px" }
heuristic_agent = "Sidai" #@param ["Sidai", "Sihing"]
opponent_model = opponents[heuristic_agent]()

Select the battle attributes for your fighter and for your opponent's fighter, and then initialize the game environment!

In [None]:
#@title Setting Environment Params { form-width: "400px" }

#@markdown Your Fighter's Attributes
your_power = 69 #@param {type:"slider", min:10, max:100, step:1}
your_speed = 37 #@param {type:"slider", min:10, max:100, step:1}
your_defence = 50 #@param {type:"slider", min:10, max:100, step:1}
your_accuracy = 50 #@param {type:"slider", min:10, max:100, step:1}

#@markdown Opponent's Attributes
opponent_power = 74 #@param {type:"slider", min:10, max:100, step:1}
opponent_speed = 50 #@param {type:"slider", min:10, max:100, step:1}
opponent_defence = 50 #@param {type:"slider", min:10, max:100, step:1}
opponent_accuracy = 50 #@param {type:"slider", min:10, max:100, step:1}

your_attributes = {
    "power": your_power,
    "speed": your_speed,
    "defence": your_defence,
    "accuracy": your_accuracy,
}
opponent_attributes = {
    "power": opponent_power,
    "speed": opponent_speed,
    "defence": opponent_defence,
    "accuracy": opponent_accuracy,
}

env = Game()
env.load_fighters(model, opponent_model, your_attributes, opponent_attributes)


Below we will define a few functions that will help us run reinforcement learning. 
<br><br>
To start, we need to be able to run a game loop, which we define with the `run_battle` function. The key to running the game loop is by iteratively taking steps in the environment until the game is done. You do this by selecting an action and running `env.step(action)`, where `env` is your initialized game environment.
<br><br>
Next, in order to use reinforcement learning, we need to collect rewards from the environment. In this example, we show an approach for a customized reward function that uses both myopic rewards (immediate impact) and discountable rewards (delayed impact).
<br><br>
We put these functions together in the `reinforcement_learning` function for a prespecified number of episodes.

In [None]:
def get_reward(action_name, your_state, your_new_state, opponent_state, opponent_new_state, winner):    
    opponent_health_delta = opponent_new_state["health"]- opponent_state["health"]
    your_health_delta = your_new_state["health"] - your_state["health"]
    prior_distance = your_state["x"] - opponent_state["x"]
    new_distance = your_new_state["x"] - opponent_new_state["x"]
    move_bool = "Left" in action_name or "Right" in action_name
    move_closer_bool = abs(prior_distance) > abs(new_distance)
    move_away_bool = abs(prior_distance) < abs(new_distance)
    
    hit_reward = (opponent_health_delta < 0) * 0.3
    get_hit_reward = (your_health_delta < 0) * -0.3
    move_closer_reward = (move_closer_bool and move_bool) * 0.1
    move_away_reward = (move_away_bool and move_bool) * -0.1
    
    result_reward = 0
    if winner == "You":
        result_reward = 2
    elif winner == "Opponent":
        result_reward = -2
    return [result_reward + hit_reward + get_hit_reward, move_closer_reward + move_away_reward]
        
    
def run_battle(env, randomize_attributes = False, random_policy = False):
    done = False
    all_actions_done = {}
    data_collection = {"s": [], "a": [], "r": {"discountable": [], "myopic": []}}
    your_state, opponent_state = env.reset(randomize_attributes, random_policy)
    your_attributes = env.your_fighter["battle_attributes"]
    opponent_attributes = env.opponent_fighter["battle_attributes"]
    state = get_state(your_state, opponent_state, your_attributes, opponent_attributes)
    while not done:
        action = env.fighters[0]["model"].select_action(state)
        your_new_state, opponent_new_state, done, winner = env.step(action)
        
        action_name = env.actions_list[action]
        if action_name not in all_actions_done:
            all_actions_done[action_name] = 1
        else:
            all_actions_done[action_name] += 1
        d_reward, m_reward = get_reward(action_name, your_state, your_new_state, opponent_state, opponent_new_state, winner)

        new_state = get_state(your_new_state, opponent_new_state, your_attributes, opponent_attributes)
        
        data_collection["s"].append(state[0])
        data_collection["a"].append(action)
        data_collection["r"]["discountable"].append(d_reward)
        data_collection["r"]["myopic"].append(m_reward)
        your_state = your_new_state.copy()
        opponent_state = opponent_new_state.copy()
        state = new_state.copy()

    return winner, data_collection


def get_full_reward(rewards, idx):
    return rewards["discountable"][idx] + rewards["myopic"][idx]


def get_discounted_return(rewards, gamma):
    running_discounted_reward = get_full_reward(rewards, len(rewards["discountable"])-1)
    discounted_return = rewards["discountable"].copy()
    discounted_return[-1] = rewards["discountable"][-1]
    for t in reversed(range(len(discounted_return) - 1)):
        discounted_return[t] = get_full_reward(rewards, t) + gamma * running_discounted_reward
        running_discounted_reward += rewards["discountable"][t]
    return np.array(discounted_return)

def reinforcement_learning(env, episodes = 100, randomize_attributes = False):
    epochs = 10
    batch_size = 24
    gamma = 0.95
    for e in range(episodes):
        winner, gameplay_data = run_battle(env, randomize_attributes)
        
        states = np.array(gameplay_data["s"])
        actions = np.array(gameplay_data["a"])
        discounted_return = get_discounted_return(gameplay_data["r"], gamma)

        env.fighters[0]["model"].train(states, actions, discounted_return, epochs, batch_size)

        if (e + 1) % 1 == 0:
            print("Episode: {}, Winner: {}, Return: {}".format(e+1, winner, np.mean(discounted_return)))

Now that we have defined all the functions, run the training loop and see what your agent has learned!

In [None]:
#@title Simulation Params { form-width: "400px" }
num_episodes = 2812 #@param {type:"slider", min:50, max:5000, step:1}
reinforcement_learning(env, num_episodes)

Once training is done, you can save your model as a JSON. We have created a number of templates to make exporting easier for you, but if you want to create your own template, please follow our exporting configurations [here](https://www.notion.so/Code-Compliance-6f4ec6c2fda64e268ca4581c4f5b3b67).

In [None]:
import pathlib
pathlib.Path('/content/saved_model').mkdir(parents=False, exist_ok=True) 
save_pytorch_model(model)