In [3]:
pip install azureml-contrib-reinforcementlearning

Note: you may need to restart the kernel to use updated packages.


In [4]:
import azureml.core
from azureml.core import Workspace
from azureml.core import Experiment
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
from azureml.core.runconfig import EnvironmentDefinition
from azureml.widgets import RunDetails
from azureml.tensorboard import Tensorboard

# Azure ML Reinforcement Learning imports
from azureml.contrib.train.rl import ReinforcementLearningEstimator, Ray
from azureml.contrib.train.rl import WorkerConfiguration

In [5]:
subscription_id = '1aefdc5e-3a7c-4d71-a9f9-f5d3b03be19a'
resource_group = 'EDATRG'
workspace_name = 'fepEDATest'

ws = Workspace(subscription_id, resource_group, workspace_name)

In [6]:
head_compute_target = ws.compute_targets['head-gpu']

worker_compute_target = ws.compute_targets['worker-cpu']

In [7]:
# Pip packages we will use for both head and worker
pip_packages=["ray[rllib]==0.8.5"] # Latest version of Ray has fixes for isses related to object transfers

# Specify the Ray worker configuration
worker_conf = WorkerConfiguration(
    
    # Azure ML compute cluster to run Ray workers
    compute_target=worker_compute_target, 
    
    # Number of worker nodes
    node_count=2,
    
    # GPU
    use_gpu=False, 
    
    # PIP packages to use
    pip_packages=pip_packages
)

In [8]:
script_params = {
    "--iterations": 200,
    "--numworkers": 6,
    "--basepolicy": "LORHeuristic"
}

rl_estimator = ReinforcementLearningEstimator(
    
    # Location of source files
    source_directory='./',
    
    # Python script file
    entry_script="lor_train_v7.py",
    
    # Parameters to pass to the script file
    # Defined above.
    script_params=script_params,
    
    # The Azure ML compute target set up for Ray head nodes
    compute_target=head_compute_target,
    
    # Pip packages
    pip_packages=pip_packages,
    
    # GPU usage
    use_gpu=True,
    
    # RL framework. Currently must be Ray.
    rl_framework=Ray(),
    
    # Ray worker configuration defined above.
    worker_configuration=worker_conf,
    
    # How long to wait for whole cluster to start
    cluster_coordination_timeout_seconds=3600,
    
    # Maximum time for the whole Ray job to run
    # This will cut off the run after an hour
    max_run_duration_seconds=3600,
)

In [9]:
%%writefile lor_train_v7.py

import random
import numpy as np
import ray
from gym.spaces import Discrete,Tuple, Box
from ray.rllib.policy.policy import Policy
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.agents.dqn import DQNTrainer
from ray.rllib import train

class LOREnv3(MultiAgentEnv):
    # all the actions
    MOVEUP = 0
    MOVEDOWN = 1
    MOVELEFT = 2
    MOVERIGHT = 3
    ATTACK = 4
    SPECIALATTACK = 5
    GOBACK = 6
    HOLD = 7
    
    action_string = {
        MOVEUP: "MoveUp",
        MOVEDOWN: "MoveDown",
        MOVELEFT: "MoveLeft",
        MOVERIGHT: "MoveRight",
        ATTACK: "Attack",
        SPECIALATTACK: "SpecialAttack",
        GOBACK: "GoBack",
        HOLD: "Hold"
    }
    
    # max heath to start with
    max_health = 1
    
    # space is of size n x n 
    # (0, 0) is at the top left corner
    # x represents the vertical direction
    # y represents the horizontal direction
    space_size_n = 4
    
    # miss rate on any one attack
    attack_miss_rate = 0
        
    # each attack takes some health
    attak_power = 1
    
    # reward of win a game
    game_award = 100
    
    invalid_action_penalty = -10
    
    # for special attack
    special_attack_cool_down = 3
    special_attack_distance = 2
    
    # turns to wait while dead
    dead_hold_turns = 3
    
        
    def generate_init_pos(self):
        player1_init_pos = [0, LOREnv3.space_size_n - 1]
        player2_init_pos = [LOREnv3.space_size_n - 1, 0]
        
        return player1_init_pos, player2_init_pos

    
    def is_in_opponent_base(self, player):
        if player == self.player1  \
            and self.position[player][0] >= LOREnv3.space_size_n - 2 and self.position[player][1] <= 1:
            return True
        if player == self.player2 \
            and self.position[player][0] <= 1 and self.position[player][1] >= LOREnv3.space_size_n - 2:
            return True
        
        return False
    
    
    def __init__(self, config):
        self.action_space = Discrete(len(LOREnv3.action_string))
        
        # the observation is a tuple: [self_pos_x, self_pos_y, self.health, pos_x, pos_y, health]
        # start with a discrete space
        self.observation_space = Tuple(
            [
                # self position in x/y
                Box(low = 0, high = LOREnv3.space_size_n - 1, shape=(2, ), dtype=np.int16),
                # opponent position in x/y
                Box(low = 0, high = LOREnv3.space_size_n - 1, shape=(2, ), dtype=np.int16),
                # self health and opponent health
                Box(low = 0, high = LOREnv3.max_health, shape=(2, ), dtype=np.int16),
                # self special attack cool down and opponent's cool down
                Box(low = 0, high = LOREnv3.special_attack_cool_down, shape=(2, ), dtype=np.int16),
                # remaining turns to revive, self and opponent. (0 means alive)
                Box(low = 0, high = LOREnv3.dead_hold_turns, shape=(2, ), dtype=np.int16),
                
            ]
        )
        
        self.player1 = "player1"
        self.player2 = "player2"
        
        self.player1_init_pos, self.player2_init_pos = self.generate_init_pos()
        
        #
        self.reset()
        
        # For test-case inspections (compare both players' number of game wins).
        self.player1_score = self.player2_score = 0

    # reset the env
    # return the initial observation
    # the player1 always take action first
    def reset(self):        
        self.position = {
                self.player1: self.player1_init_pos.copy(),
                self.player2: self.player2_init_pos.copy()
        }
        
        self.health = {
            self.player1: LOREnv3.max_health,
            self.player2: LOREnv3.max_health
        }
        
        self.special_attack_cd = {
            self.player1: 0,
            self.player2: 0,
        }
        
        self.turns_to_revive = {
            self.player1: 0,
            self.player2: 0
        }
        
        self.last_reward = 0
        
        return {
            self.player1: tuple(
                [
                    np.array([self.position[self.player1][0], self.position[self.player1][1]]),
                    np.array([self.position[self.player2][0], self.position[self.player2][1]]),
                    np.array([self.health[self.player1], self.health[self.player2]]),
                    np.array([self.special_attack_cd[self.player1], self.special_attack_cd[self.player2]]),
                    np.array([self.turns_to_revive[self.player1], self.turns_to_revive[self.player2]])
                ]
            )
        }
    
    
    def move_agent(self, player, opponent, action):
        if self.health[player] <= 0:  # no health no action
            return 0
        
        if action == LOREnv3.MOVEUP or action == LOREnv3.MOVEDOWN:
            new_x = self.position[player][0] + (1 if action == LOREnv3.MOVEDOWN else -1)
            if new_x < 0 or new_x >= LOREnv3.space_size_n: # invalid move
                return self.invalid_action_penalty
            elif (self.position[opponent][0] == new_x and self.position[opponent][1] == self.position[player][1]):
                return self.invalid_action_penalty
            else:
                self.position[player][0] = new_x
                return 0
                
        if action == LOREnv3.MOVELEFT or action == LOREnv3.MOVERIGHT:
            new_y = self.position[player][1] + (1 if action == LOREnv3.MOVERIGHT else -1)
            if new_y < 0 or new_y >= LOREnv3.space_size_n: # invalid move
                return self.invalid_action_penalty
            elif (self.position[opponent][1] == new_y and self.position[opponent][0] == self.position[player][0]):
                return self.invalid_action_penalty
            else:
                self.position[player][1] = new_y
                return 0
        
        return 0
    
    def is_adjacent(self):
        return (self.position[self.player1][0] == self.position[self.player2][0] and abs(self.position[self.player1][1] - self.position[self.player2][1]) <= 1) \
            or (self.position[self.player1][1] == self.position[self.player2][1] and abs(self.position[self.player1][0] - self.position[self.player2][0]) <= 1) 
                    
    
    def is_in_distance(self, distance):
        d_square = (self.position[self.player1][0] - self.position[self.player2][0]) * (self.position[self.player1][0] - self.position[self.player2][0]) \
            + (self.position[self.player1][1] - self.position[self.player2][1]) * (self.position[self.player1][1] - self.position[self.player2][1])
    
        return d_square <= distance * distance
    
    
    def take_action(self, player, opponent, action):
        reward = 0
        
        if self.turns_to_revive[player] > 0: # if dead, cannot take action besides HOLD
            if action == LOREnv3.HOLD:
                reward = 0
            else:
                reward = self.invalid_action_penalty
        else:
            if action == LOREnv3.ATTACK:
                hit1 =  0 if not(self.is_adjacent()) or random.random() < LOREnv3.attack_miss_rate else 1
                
                if self.turns_to_revive[opponent] > 0: # cannot attack opponent if in revive
                    hit1 = 0
            
                self.health[opponent] = self.health[opponent] - hit1  * LOREnv3.attak_power
                reward = hit1 * LOREnv3.attak_power
            elif action == LOREnv3.SPECIALATTACK:
                if self.special_attack_cd[player] > 0:
                    reward = self.invalid_action_penalty
                elif self.is_in_distance(LOREnv3.special_attack_distance) == False: # invalid, cannot use special attack
                    reward = self.invalid_action_penalty
                elif self.turns_to_revive[opponent] > 0: # cannot attack opponent if in revive
                    reward = 0
                    # reset cd
                    self.special_attack_cd[player] = LOREnv3.special_attack_cool_down
                else:
                    self.health[opponent] = self.health[opponent] - LOREnv3.attak_power
                    reward = LOREnv3.attak_power
                    # reset cd
                    self.special_attack_cd[player] = LOREnv3.special_attack_cool_down
            elif action == LOREnv3.HOLD:
                reward = 0
            elif action == LOREnv3.GOBACK:
                reward = LOREnv3.max_health - self.health[player]
            
                # gain full health and go back to init position
                self.health[player] = LOREnv3.max_health
                self.position[player] = self.player1_init_pos.copy() if player == self.player1 else self.player2_init_pos.copy()
            else: # move
                reward = self.move_agent(player, opponent, action)
        
        
        # if the opponent is dead, more reward
        if self.health[opponent] == 0:
            reward = reward * LOREnv3.dead_hold_turns
        
        # if the player reach the opponent's base     
        if self.is_in_opponent_base(player):
            reward = LOREnv3.game_award
    
        return reward
    
    
    # update state and observation based on the 2 actions
    def step(self, action_dict):
        # only one action each turn
        assert len(action_dict) == 1, action_dict
                
        if self.player1 in action_dict:
            player = self.player1
            opponent = self.player2
        else:
            player = self.player2
            opponent = self.player1
        
            
        # update special attack CD
        if self.special_attack_cd[player] > 0:
            self.special_attack_cd[player] = self.special_attack_cd[player] - 1
        
        
        # take action
        reward = self.take_action(player, opponent, action_dict[player])
        
        # if opponent is killed
        if self.health[opponent] == 0:
            # opponent is send back to init position and start revive turns
            self.health[opponent] = LOREnv3.max_health
            self.position[opponent] = self.player1_init_pos.copy() if opponent == self.player1 else self.player2_init_pos.copy()
            self.turns_to_revive[opponent] = LOREnv3.dead_hold_turns
        
        # update player's revive turns if needed
        if self.turns_to_revive[player] > 0:
            self.turns_to_revive[player] = self.turns_to_revive[player] - 1    
           
            
        # get the new obs
        obs = {
            opponent: tuple(
                [
                    np.array([self.position[opponent][0], self.position[opponent][1]]),
                    np.array([self.position[player][0], self.position[player][1]]),
                    np.array([self.health[opponent], self.health[player]]),
                    np.array([self.special_attack_cd[opponent], self.special_attack_cd[player]]),
                    np.array([self.turns_to_revive[opponent], self.turns_to_revive[player]])
                ]
            )
        }
        
        # get the reward
        rew = {
            opponent: -1 * reward + self.last_reward,
        }
        
        self.last_reward = reward
        
        done = {
            "__all__": self.is_in_opponent_base(self.player1) or self.is_in_opponent_base(self.player2)
        }
        
        # it is required that when done["__all__"] == True, the obv/rew should include all live agent
        if done["__all__"]:
            obs[player] = tuple(
                [
                    np.array([self.position[player][0], self.position[player][1]]),
                    np.array([self.position[opponent][0], self.position[opponent][1]]),
                    np.array([self.health[player], self.health[opponent]]),
                    np.array([self.special_attack_cd[player], self.special_attack_cd[opponent]]),
                    np.array([self.turns_to_revive[player], self.turns_to_revive[opponent]])
                ]
            )
            
            rew[player] = reward
        

        if self.is_in_opponent_base(self.player1) and not(self.is_in_opponent_base(self.player2)):
            self.player1_score += 1
        elif self.is_in_opponent_base(self.player2) and not(self.is_in_opponent_base(self.player1)):
            self.player2_score += 1

        return obs, rew, done, {}


class LORHeuristic(Policy):
    """
    Heuristic policy
    Random pick between one of the following.
    
    cautious
    
    if self.health > 1:
        if self and opponent is adjacent:
            attack
        elif can use special attack and within range:
            special attack
        else:
            move torwards the opponent
    else:
        if self and opponent is adjacent:
            move away from the opponent 
        elif can use special attack and within range:
            use special attack
        else:
            attack
            
    reckless
    if can move towards opponent base
        move towards it
    elif self and opponent is adjacent:
        attack
    elif can use special attack and within range:
        use special attack
    else
        move torwards the opponent
        
    
    """

    
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.exploration = self._create_exploration()
        self.space_size = LOREnv3.space_size_n
    
    def can_use_special_attack(self, obv, cd):
        d_square = (obv[0] - obv[2])*(obv[0] - obv[2]) + (obv[1] - obv[3]) * (obv[1] - obv[3])
        return d_square <= LOREnv3.special_attack_distance * LOREnv3.special_attack_distance and \
            cd <= 0
    
    def can_move_torwards_opponent_base(self, obv):
        # assume that the policy always play player2, and target (0, size -1)
        self_x = obv[0]
        self_y = obv[1]
        op_x = obv[2]
        op_y = obv[3]
        
        actions = []
        
        if self_x > 0 and not(op_y == self_y and op_x == self_x - 1): # can reduce x
            actions.append(LOREnv3.MOVEUP)
        
        if self_y < LOREnv3.space_size_n -1 and not(op_y == self_y + 1 and op_x == self_x): # can increase y
            actions.append(LOREnv3.MOVERIGHT)
            
        action = -1
        if len(actions) > 0:
            action = actions[random.randrange(len(actions))]
        
        return action
                
            
    
    def take_reckless_action(self, obv):
        # each ob is np array (self.x, self.y, oponent.x, oppoennt.y, self.health, opponent.health)
        self_x = obv[0]
        self_y = obv[1]
        op_x = obv[2]
        op_y = obv[3]
        self_h = obv[4]
        op_h = obv[5]
        self_cd = obv[6]
        op_cd = obv[7]
        self_revive_cd = obv[8]
        op_revive_cd = obv[9]
        
        if self_revive_cd > 0:
            return LOREnv3.HOLD
             
        
        try_move_towards_base = self.can_move_torwards_opponent_base(obv)
        
        
        if (self_x == op_x and abs(self_y - op_y) <= 1) or (self_y == op_y and abs(self_x - op_x) <= 1):
            return LOREnv3.ATTACK
        elif self.can_use_special_attack(obv, self_cd):
            return LOREnv3.SPECIALATTACK
        elif try_move_towards_base  >= 0:
            return try_move_towards_base 
        else: # randomly move left or down or hold
            actions = [LOREnv3.HOLD]
        
            if self_y > 0 and not(op_y == self_y - 1 and op_x == self_x): 
                actions.append(LOREnv3.MOVELEFT)
            
            if self_x < self.space_size - 1 and not(op_y == self_y and op_x == self_x + 1): 
                actions.append(LOREnv3.MOVEDOWN)
            
            return actions[random.randrange(len(actions))]
                

    def compute_actions(self,
                        obs_batch,
                        state_batches=None,
                        prev_action_batch=None,
                        prev_reward_batch=None,
                        info_batch=None,
                        episodes=None,
                        **kwargs):

        return [self.take_reckless_action(x) for x in obs_batch], [], {}
    
        #return [self.take_reckless_action(x) if random.random() < 0 else self.take_cautious_action(x)  for x in obs_batch], [], {}
    
    def learn_on_batch(self, samples):
        pass

    def get_weights(self):
        pass

    def set_weights(self, weights):
        pass


from azureml.core import Run
    
def on_train_result(info):
    '''Callback on train result to record metrics returned by trainer.
    '''
    run = Run.get_context()
    run.log(
        name='episode_reward_mean',
        value=info["result"]["episode_reward_mean"])
    run.log(
        name='episodes_total',
        value=info["result"]["episodes_total"])

import argparse

DEFAULT_RAY_ADDRESS = 'localhost:6379'

if __name__ == "__main__":
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--iterations", type=int)
    parser.add_argument("--numworkers", type=int)
    parser.add_argument("--basepolicy")
    args = parser.parse_args()
    
            
    base_policy_cls=globals()[args.basepolicy]
    
    def select_policy(agent_id):
        if agent_id == "player1":
            return "learned"
        else:
            return args.basepolicy
    
    ray.init(address=DEFAULT_RAY_ADDRESS)
    
    env = LOREnv3({})
    
    config = {
        "env": LOREnv3,
        "gamma": 0.9,
        "num_workers": args.numworkers,
        "num_envs_per_worker": 4,
        "rollout_fragment_length": 10,
        "train_batch_size": 1000,
        "multiagent": {
            "policies_to_train": ["learned"],
            "policies": {
                args.basepolicy: (base_policy_cls, env.observation_space, env.action_space, {}),
                "learned": (None, env.observation_space, env.action_space, {
                    "model": {
                            "use_lstm": True
                    },
                }),
            },
            "policy_mapping_fn": select_policy,
        },
        "callbacks": {"on_train_result": on_train_result},
    }

    run = Run.get_context()
    
    
    trainer_obj = DQNTrainer(config=config)
    env = trainer_obj.workers.local_worker().env
    for _ in range(args.iterations):
        results = trainer_obj.train()
        
        run.log(name='player1_score',
            value=sum(trainer_obj.workers.foreach_worker(lambda w : w.env.player1_score)))
        
        run.log(name='player2_score',
            value=sum(trainer_obj.workers.foreach_worker(lambda w : w.env.player2_score)))

Overwriting lor_train_v7.py


In [10]:
experiment_name='rllib-lor-v7'

exp = Experiment(workspace=ws, name=experiment_name)

run = exp.submit(config=rl_estimator)

In [11]:
from azureml.widgets import RunDetails

RunDetails(run).show()
#run.wait_for_completion()

_RLWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', 'sdk_v…

In [12]:
script_params = {
    "--iterations": 2000,
    "--numworkers": 6,
    "--basepolicy": "LORHeuristic"
}

rl_estimator = ReinforcementLearningEstimator(
    
    # Location of source files
    source_directory='./',
    
    # Python script file
    entry_script="lor_train_v7.py",
    
    # Parameters to pass to the script file
    # Defined above.
    script_params=script_params,
    
    # The Azure ML compute target set up for Ray head nodes
    compute_target=head_compute_target,
    
    # Pip packages
    pip_packages=pip_packages,
    
    # GPU usage
    use_gpu=True,
    
    # RL framework. Currently must be Ray.
    rl_framework=Ray(),
    
    # Ray worker configuration defined above.
    worker_configuration=worker_conf,
    
    # How long to wait for whole cluster to start
    cluster_coordination_timeout_seconds=3600,
    
    # Maximum time for the whole Ray job to run
    # This will cut off the run after an hour
    max_run_duration_seconds=7200,
)

experiment_name='rllib-lor-v7'

exp = Experiment(workspace=ws, name=experiment_name)

run = exp.submit(config=rl_estimator)

In [13]:
from azureml.widgets import RunDetails

RunDetails(run).show()
#run.wait_for_completion()

_RLWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', 'sdk_v…