In [1]:
import math
import random

class Arms():
    def __init__(self, mus):
        self.mus = mus
        self.n_arms = len(mus)
        self.best = max(mus)
        assert all(0 <= mu <= 1 for mu in mus)

    def __str__(self):
        return str(self.mus)

    def pull(self, idx):
        # Bernoulli reward
        return 1 if random.random() < self.mus[idx] else 0


class Policy():
    def __init__(self):
        pass

    def pick(self, n_arms, history, to_pick=[]):
        ''' to_pick stores the future picks '''
        pass


def experiment(arms, policy, T):
    ''' return simulated history and toal regret '''
    best_mu = arms.best
    n_arms = arms.n_arms
    history = [[0, 0] for _ in range(n_arms)]
    total_regret = 0

    for t in range(T):
        picked = policy.pick(n_arms, history)
        reward = arms.pull(picked)
        history[picked][0] += reward
        history[picked][1] += 1
        total_regret += best_mu - arms.mus[picked]

    return history, total_regret


def argmax(s):
    ''' return the first index corresponding to the max element '''
    return s.index(max(s))


class RandomPick(Policy):
    def pick(self, n_arms, history):
        return random.choice(range(n_arms))


class EpsGreedy(Policy):
    def __init__(self, eps):
        self.eps = eps
    def pick(self, n_arms, history):
        if random.random() < self.eps:
            return random.choice(range(n_arms))
        for i, [_, n] in enumerate(history):
            if n == 0:
                return i
        return argmax([r / n for r, n in history])

class UCB(Policy):
    def pick(self, n_arms, history):
        for i, [_, n] in enumerate(history):
            if n == 0:
                return i
        t = sum(n for _, n in history)
        ucb = [r / n + math.sqrt(math.log(t) / n) for r, n in history]
        return argmax(ucb)

In [2]:
class BatchRandomPick(Policy):
    def __init__(self, batch_size):
        self.batch_size = batch_size

    def pick(self, n_arms, history, to_pick=[]):
        if not to_pick:
            to_pick += [random.choice(range(n_arms))] * self.batch_size
        return to_pick.pop()

In [3]:
class BatchUCB(Policy):
    def __init__(self, batch_size):
        self.batch_size = batch_size
        
    def pick(self, n_arms, history, to_pick=[]):
        if to_pick:
            return to_pick.pop()
        for i, [_, n] in enumerate(history):
            if n == 0:
                return i
        t = sum(n for _, n in history)
        ucb = [r / n + math.sqrt(math.log(t) / n) for r, n in history]
        to_pick += [argmax(ucb)] * self.batch_size
        return to_pick.pop()

In [4]:
a = Arms([0.2, 0.3])
T = 100000

In [5]:
experiment(a, RandomPick(), T)

([[10100, 49959], [15088, 50041]], 4995.900000000644)

In [9]:
experiment(a, BatchUCB(100), T)

([[300, 1401], [29646, 98599]], 140.09999999999633)

In [10]:
experiment(a, BatchRandomPick(100), T)

([[9831, 49500], [15137, 50500]], 4950.000000000477)

In [11]:
experiment(a, EpsGreedy(0.1), T)

([[1012, 4925], [28493, 95075]], 492.5000000000435)

In [12]:
experiment(a, UCB(), T)

([[179, 910], [29592, 99090]], 90.9999999999991)

In [13]:
experiment(a, ABTesting(0.2, 0.3), T)

NameError: name 'ABTesting' is not defined

In [None]:
norm.isf(0.05 / 2)

In [None]:
# http://stackoverflow.com/questions/15204070/

from scipy.stats import norm, zscore
def sample_power_probtest(p1, p2, power=0.9, sig=0.05):
    
    z = norm.isf([sig / 2]) # two-sided t test
    zp = -norm.isf([power]) 
    d = p1 - p2
    s = 2 * ((p1 + p2) / 2) * (1 - (p1 + p2) / 2)
    n = s * ((zp + z) ** 2) / (d ** 2)
    return int(round(n[0]))

In [None]:
sample_power_probtest(0.2, 0.3, power=0.9, sig=0.05)

In [None]:
class ABTesting(Policy):

    def __init__(self, p1, p2, power=0.8, sig=0.05):
        self.p1 = p1
        self.p2 = p2
        self.power = power
        self.sig = sig
        self.sample_need = sample_power_probtest(p1, p2, power=0.8, sig=0.05)
        self.to_pick = []
        self.best = None
        self.test_done = False
        self.previous_history = [[0, 0], [0, 0]]
        self.test_history = [[0, 0], [0, 0]]

    def best_from_testing(self):
        [r1, n1], [r2, n2] = self.test_history
        p1 = r1 / n1
        p2 = r2 / n2
        print(p1, p2, n1, n2)
        z = ((p1 - p2) - (self.p1 - self.p2)) / math.sqrt(p1 * (1 - p1) / n1 + p2 * (1 - p2) / n2)
        print(z)
        return 0
        
    def pick(self, n_arms, history):
        assert n_arms == 2
        # if we have the best choice, pick it
        if self.best is not None:
            return self.best

        # update what happened in privous round
        for i in range(2):
            for j in range(2):
                self.test_history[i][j] += history[i][j] - self.previous_history[i][j]
                self.previous_history[i][j] = history[i][j]

        if not self.to_pick:
            if self.test_done:
                # determine the best one from AB testing
                self.best = self.best_from_testing()
                self.test_done = False
                if self.best is not None:
                    return self.best
            else:
                # schedule a test
                self.to_pick = list(range(n_arms)) * self.sample_need
        to_pick = self.to_pick.pop()
        if not self.to_pick:
            self.test_done = True
        return to_pick