In [None]:
def run(agent, env, episodes=20, episode_duration=100, render=True, verbose=False):
    """
    Runs a reinforcement learning experiment
    :param agent: implements the method act(observation)
    :param env: an instance of gym.Env
    :param episodes: number of episodes to run
    :param episode_duration: number of steps of each episode
    :param render: render each step?
    :param verbose: print transition information?
    """
    for ep in range(episodes):
        observation = env.reset()
        acc_reward = 0
        if render:
            env.render() # show initial state
        for t in range(100):         
            # print(observation)
            action = agent.act(observation)
            observation, reward, done, info = env.step(action)
            acc_reward += reward
            if verbose:
                print(observation, reward, done, info)          
            if render:
                env.render() 
                
            if done:
                print("Episode {} finished after {} timesteps w/ total reward {}".format(ep+1, t+1, acc_reward))
                break


In [None]:
# test actual agents in actual environments
import gym_adversarialgrid.envs.adversarialgrid as adversarialgrid
import gym_adversarialgrid.agents.tabular as tabular
import gym_adversarialgrid.agents.exp3mg as exp3mg
import gym_adversarialgrid.agents.hedgemg as hedgemg
import gym_adversarialgrid.agents.minimaxq as minimaxq

num_iter = 1000 #number of iterations
#g = min(1, math.sqrt( (num_iter * math.log(num_iter)) / ((math.e - 1) * num_iter) ))
g = 0.1 # for hedge and Exp3

#env = adversarialgrid.AdversarialGrid(opponent='Fixed', map='3x4', action=adversarialgrid.NOOP)
env = adversarialgrid.AdversarialGrid(opponent='Random', map='3x4') #, action=adversarialgrid.NOOP)

#agent = tabular.TabularQAgent(env.observation_space, env.action_space, eps=0.1, init_mean=1)
#agent = exp3mg.Exp3MG(env.observation_space, env.action_space, gamma=g)
#agent = exp3mg.Exp3MG_1995(env.observation_space, env.action_space, gamma=g)
#agent = hedgemg.HedgeMG(env.observation_space, env.action_space, gamma=g)
#agent = hedgemg.HedgeMG_1995(env.observation_space, env.action_space, gamma=g)
agent = minimaxq.MinimaxQ(env.observation_space, env.action_space, env.opp_action_space, gamma=g)

#train
agent.train(env, num_iter)
#pprint(agent.q)

#test
agent.config['eps'] = 0 #all greedy o/
env.print_deterministic_policy(agent.greedy_policy())
#run(agent, env, render=True, verbose=True)
run(agent, env, render=False, verbose=False)

In [None]:
class Exp3(tabular.TabularQAgent):
    """
    Auer 2002 implementation of the Exp3 method

    References:

    Auer, P., Cesa-Bianchi, N., Freund, Y., & Schapire, R. E. (2002).
    The nonstochastic multiarmed bandit problem.
    Society for Industrial and Applied Mathematics, 32(1), 48–77.
    """

    def __init__(self, *args, **kwargs):
        super(Exp3, self).__init__(*args, **kwargs)
        self.gamma = kwargs['gamma'] if 'gamma' in kwargs else 0.07
        self.weights = [1.0] * self.action_space.n
        
    def policy(weights, gamma=0.0):
        the_sum = float(sum(weights))
        return tuple((1.0 - gamma) * (w / the_sum) + (gamma / len(weights)) for w in weights)
 
    def act(self, observation):
        the_policy = Exp3.policy(self.weights, self.gamma)
        return categorical_draw(the_policy)
        
    def learn(self, s, a, reward, sprime, done):
        policy = Exp3.policy(self.weights, self.gamma)
        estimatedReward = 1.0 * reward / policy[a]
        self.weights[a] *= math.exp(estimatedReward * self.gamma / self.action_space.n) # important that we use estimated reward here!
        #print(['%.3f' % w for w in self.weights])
        #print(['%.3f' % w for w in Exp3.policy(self.weights)])


In [None]:
import gym
import sys
from six import StringIO, b
from gym import error, spaces, utils
from gym.utils import seeding
import scipy.stats as stats
import numpy as np

class Bandit(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self, n_arms=10):
        self.bandits_mu = stats.norm(0, 1).rvs(n_arms)
        self.sigma = 1
        self.last_choice = -1
        self.last_reward = -10
        
        #bernoulli arm:
        #self.reward_vector = [[1 if random.random() < bias else 0 for bias in biases] for _ in range(numRounds)]
        self.arms = [stats.bernoulli(1 / (k+2) ) for k in range(n_arms)]
        
        self.action_space = gym.spaces.Discrete(n_arms)
        self.observation_space = gym.spaces.Discrete(1)
        #self.arms = [stats.norm(self.bandits_mu[arm], self.sigma) for arm in n_arms]

    def _step(self, action):
        self.last_choice = action
        reward = self.arms[action].rvs(1) #np.random.normal(self.bandits_mu[action], self.sigma)
        self.last_reward = reward
        info = {
            "action": action,
            "expected_rwd": self.arms[action].mean()
        }
        
        # return: state, reward, done, info
        return 0, reward, True, info
    
    def _reset(self):
        self.last_choice = -1
        self.last_reward = -10
        
    def _render(self, mode='human', close=False):
        if close:
            return
        outfile = StringIO() if mode == 'ansi' else sys.stdout

        #row, col = self.current_state  # self.s // self.ncols, self.s % self.ncols
        desc = ' '.join(['%.3f' % mu for mu in self.bandits_mu])
        #desc = [c.decode('utf-8') for c in desc] 
        #desc[col] = utils.colorize(desc[col], "red", highlight=True)
        if self.last_choice is not None:
            outfile.write("  ({}, {})\n".format(self.last_choice, self.last_reward))
        else:
            outfile.write("\n")
        outfile.write(desc + "\n\n")

        if mode != 'human':
            return outfile
        
# test the exp3 agent here
env = Bandit(n_arms=10)
#agent = SGExp3(env.observation_space, env.action_space, gamma=0.2)
agent = Exp3(env.observation_space, env.action_space, gamma=0.2)

#train
agent.train(env, 1000)

#test
run(agent, env, render=True, verbose=True)

In [None]:
# C-Exp3: contextual Exp3 -- doesn't work well because it can't discover sequential actions 
# what if we use MC returns?
import gym_adversarialgrid.agents.tabular as tabular
from collections import defaultdict
import random
import math


class CExp3(tabular.TabularQAgent):
    """
    Contextual Exp3: Exp3 (Auer et. al 1995) for contextual bandits
    The implementation of Exp3 we are extending is the one shown in Auer et. al 2002.

    References:

    Auer, P., Cesa-Bianchi, N., Freund, Y., & Schapire, R. E. (1995).
    Gambling in a rigged casino: The adversarial multi-armed bandit problem.
    Proceedings of IEEE 36th Annual Foundations of Computer Science, 322–331.
    https://doi.org/10.1109/SFCS.1995.492488

    Auer, P., Cesa-Bianchi, N., Freund, Y., & Schapire, R. E. (2002).
    The nonstochastic multiarmed bandit problem.
    Society for Industrial and Applied Mathematics, 32(1), 48–77.
    """

    def __init__(self, *args, **kwargs):
        super(CExp3, self).__init__(*args, **kwargs)

        self.gamma = kwargs['gamma'] if 'gamma' in kwargs else 0.2

        n_actions = self.action_space.n
        
        # cannot initialize q with zeroes
        self.q = defaultdict(
            #lambda: [0.01] * n_actions
            lambda: [1.] * n_actions
        )
        
        # policy initialized as uniformly random
        self.policy = defaultdict(lambda: [1.0 / n_actions] * n_actions)

    def calculate_policy(self, state):
        """
        Calculates the policy for a given state and returns it
        :param state: 
        :return: list(float) the policy (probability vector) for that state
        """
        # short aliases
        s = state  # s stands for state
        g = self.gamma  # g stands for gamma
        n = self.action_space.n  # n stands for the number of actions
        pi_s = self.policy[state]  # pi_s stands for the policy in state s

        sum_weights = sum(self.q[s])

        # the policy is a probability vector, giving the probability of each action
        # pi(s, . ) = [(1 - gamma)*q(s,a) + gamma / n] - for each action
        #print(state, pi_s, self.q[s])
        pi_s = [((1 - g) * value / sum_weights) + (g / n) for value in self.q[s]]
        #print(state, pi_s)
        return pi_s

    def act(self, observation):
        prob_vector = self.calculate_policy(observation)
        return categorical_draw(prob_vector)

    def learn(self, s, a, reward, sprime, done):
        # aliases:
        #pi_sp = self.policy[sprime]  # the policy for the next state
        #q_sp = self.q[sprime]  # the action values for next state
        n = self.action_space.n  # the number of actions

        # x is a value to be scaled and weighted by its probability
        x = reward

        # scales x to [0, 1] - assuming minimum reward is -1 and max reward is +1
        # rescaling as per https://en.wikipedia.org/wiki/Feature_scaling#Rescaling
        max_x = 1 
        min_x = -1 

        scaled_x = (x - min_x) / (max_x - min_x)

        # weights the value by its probability
        x_hat = scaled_x / self.policy[s][a]

        # finally updates the value
        print('q(s,a), r, x, ~x, ^x = %.3f, %.3f, %.3f, %.3f, %.3f' % (self.q[s][a], reward, x, scaled_x, x_hat))
        self.q[s][a] *= math.exp(self.gamma * x_hat / n)


In [None]:
# C-Exp3 q vs Fixed-NOOP -- seems that C-Exp3 does not work 
# because rewards are sparse and agent cannot 'connect' action sequences
import gym_adversarialgrid.envs.adversarialgrid as adversarialgrid

env = adversarialgrid.AdversarialGrid(opponent='Fixed', map='3x4', action=adversarialgrid.NOOP)
agent = CExp3(env.observation_space, env.action_space, gamma=0.2)

#train
agent.train(env, 10000)
#pprint(agent.q)

#test
agent.config['eps'] = 0 #all greedy o/
#env.print_deterministic_policy(agent.greedy_policy())
#run(agent, env, render=True, verbose=True)
run(agent, env, render=False, verbose=False)

In [None]:
import numpy as np
np.array([[2,1], [3, 4], [5,6]])