In [1]:
import numpy as np
import scipy.misc
import scipy.io
import matplotlib.pyplot as plt
%matplotlib inline
from functools import lru_cache

In [2]:
import generate_state_matrices
def states(*args, **kwargs):
    S, T, R = generate_state_matrices.states(*args, **kwargs)
    T = np.rollaxis(T, 2, 0)
    return S, T, R

import mdptoolbox, mdptoolbox.example
def backward_induction(T, R, N):
    fh = mdptoolbox.mdp.FiniteHorizon(T, R, 1, N=N)
    fh.run()
    return fh.V[:, 0]

def s_idx(s):
    """The index of a row in S"""
    return int(sum(range(int(sum(s)-1))) + s[1] - 1)

from joblib import Memory
cache = Memory(cachedir='.joblib_cache', verbose=0).cache

In [3]:
from collections import namedtuple

Blinker = namedtuple('Blinker', ('Q', 'S', 'T', 'R'))
Blinker.__repr__ = lambda s: 'Blinker'

def blinker(N_ARM, COST, MAX_STEPS=None):
    """Returns a blinker-approximated Q function."""
    MAX_STEPS = MAX_STEPS or int(1/(4*COST)) + 3
    S, T, R = states(MAX_STEPS, N_ARM, cost=COST)
    params = (N_ARM, COST, MAX_STEPS)

    # The one-bandit case is used to compute blinkered Q values
    S1, T1, R1 = states(MAX_STEPS, 1, cost=COST)
    R1_alt = R1.copy()  # we change this for each new V1

    @cache
    def V1(constant, dummy):
        # betting chooses the constant if it's better than the bandit's expected reward
        R1_alt[:, -1] = np.maximum(constant, R1[:, -1])
        R1_alt[-1, -1] = 0
        return backward_induction(T1, R1_alt, N=MAX_STEPS+1)

    @lru_cache(None)
    def Q1(s, a, constant):
        if a == 1:  # bet
            return max(R1[s,1], constant)
        else:  # observe
            V = V1(constant, params)
            return T1[a, s] @ V - COST

    @lru_cache(None)
    def expected_values(s):
        hits = S[s][0:None:2]
        misses = S[s][1:None:2]
        return hits/(misses+hits)

    def Q(s, a):
        if a == N_ARM:
            return max(expected_values(s))

        # alternative is selecting current best
        mu = np.array(expected_values(s))
        mu[a] = -np.inf
        alternative = mu.max()

        idx = a * 2
        s_arm = s_idx(S[s][[idx, idx+1]])
        return Q1(s_arm, 0, alternative)
    
    return Blinker(Q, S, T, R)



0.5767619047619047

In [4]:
%%capture
result = {}
for n_arm in range(2, 6):
    b = blinker(N_ARM=n_arm, COST=0.01, MAX_STEPS=6)
    result['Q_blinker_{}'.format(n_arm)] = Q = np.zeros(b.R.shape)
    n_state, n_action = b.R.shape
    for s in range(n_state):
        for a in range(n_action):
            Q[s, a] = b.Q(s, a)
            

In [5]:
scipy.io.savemat('../../results/Q_blinkered.mat', result)