## Designing game AI with Reinforcement learning

## Introduction
To design a neural network for simulating and playing the game Halite by Two Sigma using an Actor-Critic agent, let's outline the steps and components involved in creating this AI model. The Actor-Critic model, a reinforcement learning approach, consists of two main components:

Actor: Determines the action to take based on the current state of the environment. In the context of Halite, this would involve deciding the direction each ship should move to collect halite or return it to a shipyard.

Critic: Estimates the value function of the current state, or in other words, it evaluates how good the current state is for the agent. This helps in adjusting the policy defined by the actor towards more rewarding actions.

## Implementation

In [None]:
!pip install kaggle-environments --upgrade

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import sys
import PIL.Image

import tensorflow as tf
import logging

from sklearn import preprocessing
import random
import matplotlib.pyplot as plt
import seaborn as sns

from kaggle_environments import evaluate, make
from kaggle_environments.envs.halite.helpers import *


In [None]:
seed=123
tf.compat.v1.set_random_seed(seed)
# ensure repeatable random number generation.
session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
#prevents TensorFlow from using multiple threads and makes the execution deterministic
sess = tf.compat.v1.Session(graph=tf.compat.v1.get_default_graph(), config=session_conf)
# initializes the actual runtime environment used to run graphs and evaluate tensors
tf.compat.v1.keras.backend.set_session(sess)
#Setting the Keras backend to use this session.
logging.disable(sys.maxsize)
#This prevents any logs from being printed. 
global ship_
#Defining a global variable called ship_ which will be accessible throughout the code.

## Analyzing the environment
Lets take a tour of our environment and its settings first.

In [None]:
env = make("halite", debug=True) #"make" function from kaggle_environments
# initialize a Halite environment
env.run(["random"]) # take random actions each turn
env.render(mode="ipython",width=800, height=600) #enders the environment visually after running the agen

In [None]:
env.configuration

# {'episodeSteps': 400, The maximum number of steps per episode 
#  'agentExec': 'LOCAL',  execute locally, not remotely like on a cloud server
#  'agentTimeout': 30, The timeout for the agent to return an action decision is 30 seconds
#  'actTimeout': 6, The per-step action timeout is 6 seconds
#  'runTimeout': 9600,  The maximum timeout for the full environment run 
#  'startingHalite': 24000, Each agent starts with 24000 initial Halite at the episode start
#  'size': 21, The size of the game board is 21x21
#  'spawnCost': 500, The cost to spawn a ship is 500
#  'convertCost': 500, The cost to convert a ship is 500
#  'moveCost': 0, The Halite cost for a ship to move is 0.
#  'collectRate': 0.25, A ship collects 0.25 Halite per step  from a cell
#  'regenRate': 0.02, Each tile regenerates 0.02 Halite per step, Each 'tile' can contain some amount of Halite resource
#  'maxCellHalite': 500}  The maximum amount of Halite a tile can hold is 500

In [None]:
env.specification

In [None]:
env.specification.reward

In [None]:
env.specification.action

In [None]:
env.specification.observation
#'Serialized list of available halite per cell on the board

## The game begins
So lets train our model with respect to random actions and see what happens...

In [None]:
def getDirTo(fromPos, toPos, size): # Returns the direction to move from one position to another
    # input ship and target positions, size of the board
    fromX, fromY = divmod(fromPos[0],size), divmod(fromPos[1],size)
    toX, toY = divmod(toPos[0],size), divmod(toPos[1],size)
    if fromY < toY: return ShipAction.NORTH 
    if fromY > toY: return ShipAction.SOUTH
    if fromX < toX: return ShipAction.EAST
    if fromX > toX: return ShipAction.WEST

# Directions a ship can move
directions = [ShipAction.NORTH, ShipAction.EAST, ShipAction.SOUTH, ShipAction.WEST]

# Will keep track of whether a ship is collecting halite or carrying cargo to a shipyard
ship_states = {}

# Returns the commands we send to our ships and shipyards
def simple_agent(obs, config): 
    # input Serialized list of available halite per cell on the board, and env config
    size = config.size
    board = Board(obs, config)
    me = board.current_player
    # If there are no ships, use first shipyard to spawn a ship.
    if len(me.ships) == 0 and len(me.shipyards) > 0:
        me.shipyards[0].next_action = ShipyardAction.SPAWN

    # If there are no shipyards, convert first ship into shipyard.
    if len(me.shipyards) == 0 and len(me.ships) > 0:
        me.ships[0].next_action = ShipAction.CONVERT

    for ship in me.ships:
        if ship.next_action == None:

            ### Part 1: Set the ship's state
            if ship.halite < 200: # If cargo is too low, collect halite
                ship_states[ship.id] = "COLLECT"
            if ship.halite > 500: # If cargo gets very big, deposit halite
                ship_states[ship.id] = "DEPOSIT"

            ### Part 2: Use the ship's state to select an action
            if ship_states[ship.id] == "COLLECT":
                # If halite at current location running low,
                # move to the adjacent square containing the most halite
                if ship.cell.halite < 100:
                    neighbors = [ship.cell.north.halite, ship.cell.east.halite,
                                 ship.cell.south.halite, ship.cell.west.halite]
                    best = max(range(len(neighbors)), key=neighbors.__getitem__)
                    ship.next_action = directions[best]
            if ship_states[ship.id] == "DEPOSIT":
                # Move towards shipyard to deposit cargo
                direction = getDirTo(ship.position, me.shipyards[0].position, size)
                if direction: ship.next_action = direction

    return me.next_actions

In [None]:
trainer = env.train([None, "random"]) #random agent #interact with env
observation = trainer.reset() #reset environment #each cell halite reset
while not env.done:
    my_action = simple_agent(observation, env.configuration)
    print("My Action", my_action)
    observation = trainer.step(my_action)[0]  #update each cell halite
    print("Reward gained",observation.players[0][0])

In [None]:
env.render(mode="ipython",width=800, height=600)

## The Actor-Critic model

In [None]:
def ActorModel(num_actions,in_): # input number of actions, number of halite in each cell
    common = tf.keras.layers.Dense(128, activation='tanh')(in_) #output 128 nodes after tanh activation
    common = tf.keras.layers.Dense(32, activation='tanh')(common)
    common = tf.keras.layers.Dense(num_actions, activation='softmax')(common) #each probability of action (5 action) 

    return common

In [None]:
def CriticModel(in_): #number of halite in each cell
    common = tf.keras.layers.Dense(128)(in_) #output 128 nodes after tanh activation
    common = tf.keras.layers.ReLU()(common)
    common = tf.keras.layers.Dense(32)(common)
    common = tf.keras.layers.ReLU()(common)
    common = tf.keras.layers.Dense(1)(common) #output value

    return common

In [None]:
input_ = tf.keras.layers.Input(shape=[441,]) #441 cell map size: 21*21
model = tf.keras.Model(inputs=input_, outputs=[ActorModel(5,input_),CriticModel(input_)])
model.summary()

In [None]:
optimizer = tf.keras.optimizers.Adam(lr=7e-4)

In [None]:
huber_loss = tf.keras.losses.Huber()
action_probs_history = []
critic_value_history = []
rewards_history = []
running_reward = 0
episode_count = 0
num_actions = 5
eps = np.finfo(np.float32).eps.item() #prevent Denominator to 0
gamma = 0.99  # Discount factor for rewards
env = make("halite", debug=True)
trainer = env.train([None,"random"])

## Encoding our moves

In [None]:
le = preprocessing.LabelEncoder()
label_encoded = le.fit_transform(['NORTH', 'SOUTH', 'EAST', 'WEST', 'CONVERT'])
label_encoded

In [None]:
def getDirTo(fromPos, toPos, size):
    fromX, fromY = divmod(fromPos[0],size), divmod(fromPos[1],size)
    toX, toY = divmod(toPos[0],size), divmod(toPos[1],size)
    if fromY < toY: return ShipAction.NORTH
    if fromY > toY: return ShipAction.SOUTH
    if fromX < toX: return ShipAction.EAST
    if fromX > toX: return ShipAction.WEST

# Directions a ship can move
directions = [ShipAction.NORTH, ShipAction.EAST, ShipAction.SOUTH, ShipAction.WEST]

def decodeDir(act_):
    if act_ == 'NORTH':return directions[0]
    if act_ == 'EAST':return directions[1]
    if act_ == 'SOUTH':return directions[2]
    if act_ == 'WEST':return directions[3]

# Will keep track of whether a ship is collecting halite or carrying cargo to a shipyard
ship_states = {}
ship_ = 0
def update_L1():
    ship_+=1
# Returns the commands we send to our ships and shipyards
def advanced_agent(obs, config, action): #input Serialized list of available halite per cell on the board, and env config
    size = config.size
    board = Board(obs, config)
    me = board.current_player
    act = le.inverse_transform([action])[0]
    global ship_

   # If there are no ships, use first shipyard to spawn a ship.
    if len(me.ships) == 0 and len(me.shipyards) > 0:
        me.shipyards[ship_-1].next_action = ShipyardAction.SPAWN

    # If there are no shipyards, convert first ship into shipyard.
    if len(me.shipyards) == 0 and len(me.ships) > 0 and ship_==0:
        me.ships[0].next_action = ShipAction.CONVERT
    try:
        if act=='CONVERT':
            me.ships[0].next_action = ShipAction.CONVERT
            update_L1()
            if len(me.ships)==0 and len(me.shipyards) > 0:
                me.shipyards[ship_-1].next_action = ShipyardAction.SPAWN
        if me.ships[0].halite < 200:
            ship_states[me.ships[0].id] = 'COLLECT'
        if me.ships[0].halite > 800:
            ship_states[me.ships[0].id] = 'DEPOSIT'

        if ship_states[me.ships[0].id] == 'COLLECT':
            if me.ships[0].cell.halite < 100:
                me.ships[0].next_action = decodeDir(act)
        if ship_states[me.ships[0].id] == 'DEPOSIT':
            # Move towards shipyard to deposit cargo
            direction = getDirTo(me.ships[0].position, me.shipyards[ship_-1].position, size)
            if direction: me.ships[0].next_action = direction
    except:
        pass

    return me.next_actions

In [None]:
while not env.done:
    state = trainer.reset() # Reset the environment (reset each cell halite, not Q function)
    episode_reward = 0
    with tf.GradientTape() as tape: # Record operations for automatic differentiation
        for timestep in range(1,env.configuration.episodeSteps+200):
            # of the agent in a pop up window. #each halite convert to tensor
            state_ = tf.convert_to_tensor(state.halite)
            state_ = tf.expand_dims(state_, 0)
            # Predict action probabilities and estimated future rewards
            # from environment state, cell map size: 21*21, each cell halite
            action_probs, critic_value = model(state_) 
            critic_value_history.append(critic_value[0, 0])

            # Sample action from action probability distribution
            action = np.random.choice(num_actions, p=np.squeeze(action_probs)) #pertain action random
            action_probs_history.append(tf.math.log(action_probs[0, action]))

            # Apply the sampled action in our environment
            action = advanced_agent(state, env.configuration, action)
            state = trainer.step(action)[0] # Step the environment with action
            gain=state.players[0][0]/5000 # get reward from step and norm
            rewards_history.append(gain)
            episode_reward += gain

            if env.done:
                state = trainer.reset()
        # Update running reward to check condition for solving
        running_reward = 0.05 * episode_reward + (1 - 0.05) * running_reward
        #running reward is the sum of previous episode, episode reward is the current episode reward
        # average the two to get a smoothed reward signal

        # Calculate expected value from future rewards (critic value)
        # - At each timestep what was the total reward received after that timestep
        # - Rewards in the future are discounted by multiplying them with gamma
        # - These are the labels for our critic
        returns = []
        discounted_sum = 0
        for r in rewards_history[::-1]:
            discounted_sum = r + gamma * discounted_sum
            returns.insert(0, discounted_sum)
        # Normalize
        returns = np.array(returns)
        returns = (returns - np.mean(returns)) / (np.std(returns) + eps)
        returns = returns.tolist()
        # Calculating loss values to update our network
        history = zip(action_probs_history, critic_value_history, returns) # true critic value, predict critic value
        actor_losses = []
        critic_losses = []
        for log_prob, value, ret in history:
            # At this point in history, the critic estimated that we would get a
            # total reward = `value` in the future. We took an action with log probability
            # of `log_prob` and ended up recieving a total reward = `ret`.
            # The actor must be updated so that it predicts an action that leads to
            # high rewards (compared to critic's estimate) with high probability.
            diff = ret - value
            actor_losses.append(-log_prob * diff)  # actor loss #action with log probability*(true-predict critic value)

            # The critic must be updated so that it predicts a better estimate of
            # the future rewards.
            critic_losses.append( #true-predict critic value
                huber_loss(tf.expand_dims(value, 0), tf.expand_dims(ret, 0))
            )
        # Backpropagation
        loss_value = sum(actor_losses) + sum(critic_losses)
        grads = tape.gradient(loss_value, model.trainable_variables) #calculate gradients
        optimizer.apply_gradients(zip(grads, model.trainable_variables)) #update network weights

        # Clear the loss and reward history
        action_probs_history.clear()
        critic_value_history.clear()
        rewards_history.clear()

    # Log details
    episode_count += 1
    if episode_count % 10 == 0:
        template = "running reward: {:.2f} at episode {}"
        print(template.format(running_reward, episode_count))

    if running_reward > 550:  # Condition to consider the task solved
        print("Solved at episode {}!".format(episode_count))
        break

In [None]:
while not env.done:
    state_ = tf.convert_to_tensor(state.halite)
    state_ = tf.expand_dims(state_, 0)
    action_probs, critic_value = model(state_)
    critic_value_history.append(critic_value[0, 0])
    action = np.random.choice(num_actions, p=np.squeeze(action_probs))
    action_probs_history.append(tf.math.log(action_probs[0, action]))
    action = advanced_agent(state, env.configuration, action)
    state = trainer.step(action)[0]

## Results
The Yellow ships and shipyards are controlled by our trained actor-critic model and the red ship and shipyards are trained against the random predicting agent.

In [None]:
env.render(mode="ipython",width=800, height=600)