**Implementing D-MAB, as described in DaCosta et al. - 2008 - Adaptive operator selection with dynamic multi-arm**

>  (hybrid between UCB1 and Page-Hinkley (PH) test)

D-MAB maintains four indicators for each arm $i$:
1. number $n_{i, t}$ of times $i$-th arm has been played up to time $t$;
2. the average empirical reward $\widehat{p}_{j, t}$ at time $t$;
3. the average and maximum deviation $m_i$ and $M_i$ involved in the PH test, initialized to $0$ and updated as detailed below. At each time step $t$:

D-MAB selects the arm $i$ that maximizes equation 1:

$$\widehat{p}_{i, t} + \sqrt{\frac{2 \log \sum_{k}n_{k, t}}{n_{i, t}}}$$

> Notice that the sum of the number of times each arm was pulled is equal to the time $\sum_{k}n_{k, t} = t$, but since their algorithm resets the number of picks, we need to go with the summation. 

and receives some reward $r_t$, drawn after reward distribution $p_{i, t}$.

> I think there is a typo in the eq. 1 on the paper. I replaced $j$ with $i$ in the lower indexes.

The four indicators are updated accordingly:

- $\widehat{p}_{i, t} :=\frac{1}{n_{i, t} + 1}(n_{i, t}\widehat{p}_{i, t} + r_t)$
- $n_{i, t} := n_{i, t}+1$
- $m_i := m_i + (\widehat{p}_{i, t} - r_t + \delta)$
- $M_i:= \text{max}(M_i, m_i)$

And if the PH test is triggered ($M_i - m_i > \lambda$), the bandit is restarted, i.e., for all arms, all indicators are set to zero (the authors argue that, empirically, resetting the values is more robust than decreasing them with some mechanism such as probability matching).

> I will reset to 1 instead of 0 (as the original paper does) to avoid divide by zero when calculating UCB1.

The PH test is a standard test for the change hypothesis. It works by monitoring the difference between $M_i$ and $m_i$, and when the difference is greater than some uuser-specified threshold $\lambda$, the PH test is triggered, i.e., it is considered that the Change hypothesis holds.

Parameter $\lambda$ controls the trade-off between false alarms and un-noticed changes. Parameter $\delta$ enforces the robustness of the test when dealing with slowly varying environments.

We also need a scaling mechanism to control the Exploration _versus_ Exploitation balance. They proposed two, from which I will focus on the first: Multiplicative Scaling (cUCB). **It consists on multiplying all rewards by a fixed user-defined parameter $C_{M-\text{scale}}$.

This way, we need to give to our D-MAB 3 parameters: $\lambda$, $\delta$, and $C_{M - \text{scale}}$. In the paper they did a sensitivity analysis of the parameters, but I think they should be fine tuned for each specific data set.

> Besides the problem of having to adjust the parameters of the `D_MAB`, I think this is not suited for Symbolic Regression as it is! In symbolic regression we normally have a population with high diversity to explore the search space --- so the mutation that maximizes the reward can be different depending on the expression being mutated. In opposition, if the mutations could be applied regardless of the expression format, then the population would be made of a few set of expressions.
>
> We should use something like Contextual Bandits to address this problem. An improvement to the algorithm would be including `context_f` that returns an proper context to use when determining which arm to pull.

In [1]:
class D_MAB:
    def __init__(self, num_bandits, verbose=False, *, policy='max_ucb', delta, lmbda, scaling, pull_f, reward_f):
        self.num_bandits = num_bandits
        self.verbose     = verbose
        self.policy      = policy #['max_ucb', 'prob_ucb']
        self.delta       = delta
        self.lmbda       = lmbda
        self.scaling     = scaling
        self.pull_f      = pull_f
        self.reward_f    = reward_f

        # History of choices and time instant t (just to track the behavior)
        self.history = {i:[] for i in range(self.num_bandits)}

        self._reset_indicators()

    def _reset_indicators(self):
        self.avg_reward    = np.zeros(self.num_bandits)
        self.num_played    = np.zeros(self.num_bandits)
        self.avg_deviation = np.zeros(self.num_bandits)
        self.max_deviation = np.zeros(self.num_bandits)

    def _calc_UCB1s(self):
        # We need that avg_reward \in [0, 1] else we must scale it
        rewards = self._normalize(self.avg_reward)

        # log1p and +1 on denominator fixes some numeric problems in the original eq.
        #scores = np.array([rewards[i] + np.sqrt(2*np.log1p(sum(self.num_played))/(self.num_played[i]+1))
        #    for i in range(self.num_bandits)])

        scores = rewards + np.sqrt(2*np.log1p(sum(self.num_played))/(self.num_played+1))
        
        return np.nan_to_num(scores, nan=0.0)

    def _scale_reward(self, reward):
        # We need to scale if the reward is not in [0, 1].
        return reward*self.scaling
    
    def  _normalize(self, p):
        values = np.nan_to_num(p, nan=0)

        if np.sum(p)==0.0:
            return np.ones(len(p))/len(p)
        
        return values / (values.sum())
    
    def playAndOptimize(self, **kwargs):
        # For convenience, this takes kwargs that are passed to both pull and reward functions

        # It will pick the bandit that maximizes eq.1. 
        UCB1s  = self._calc_UCB1s()

        # We need to know which arm we picked, what it returned, and how to calculate the reward given what the arm returned
        picked = (
            np.nanargmax(UCB1s) if self.policy=='max_ucb' else
            np.random.choice(self.num_bandits, p=self._normalize(UCB1s))
        )
        
        pulled = self.pull_f(picked, **kwargs)
        reward = self.reward_f(pulled, **kwargs)
        
        self.history[picked].append(reward)

        if self.verbose:
            print(f"Avg. Rewards: {self.avg_reward}\nUCB1 scores : {UCB1s}\nPicked      : {picked}\nReward      : {reward}")

        # After choosing, it will implicitly update the parameters based on the return
        if np.isfinite(reward):
            self.avg_reward[picked]    = (self.num_played[picked]*self.avg_reward[picked] + self._scale_reward(reward))/(self.num_played[picked]+1)
            self.avg_deviation[picked] = self.avg_deviation[picked] + (self.avg_reward[picked] - self._scale_reward(reward) + self.delta)
            
        self.num_played[picked]    = self.num_played[picked] +1
        self.max_deviation[picked] = np.maximum(self.max_deviation[picked], self.avg_deviation[picked])

        if (self.max_deviation[picked] - self.avg_deviation[picked] > self.lmbda):
            self._reset_indicators()
            if self.verbose:
                print("Reseted indicators ----------------------------------------")

        return picked, pulled, reward

Below I'll create a simple bandit configuration so we can do a sanity check of our `D_MAB` implementation.

In [2]:
# Sanity checks
import numpy as np

for bandits, descr, expec in [
    (np.array([1.0, 1.0,  1.0,  1.0]), 'All bandits with same probs', 'similar amount of pulls for each arm'),
    (np.array([-1.0, 0.2,  0.0,  1.0]), 'One bandit with higher prob', 'more pulls for first arm, less pulls for last'),
    (np.array([-0.2, -1.0,  0.0,  -1.0]), 'Two bandits with higher probs', '2nd approx 4th > 1st > 3rd'),
]:
    # Implementing simple bandits.
    def pullBandit(bandit, **kwargs):
        # Needs to have kwargs to work with my D_MAB implementation

        #Get a random number based on a normal dist with mean 0 and var 1
        result = np.random.randn()
        
        # bandits: This is the true reward probabilities, which we shoudn't have access (in the optimizer)
        # return a positive or negative reward based on bandit prob.
        return 1.0 if result > bandits[bandit] else 0.0

    
    print("\n==============================================================")
    print(descr)

    print("------------- Uniformly Distributed Random pulls -------------")
    picks   = [0, 0, 0, 0]
    rewards = [0, 0, 0, 0]

    for _ in range(10000):
        index  = np.random.randint(len(bandits))
        reward = pullBandit(index)

        picks[index]   = picks[index]+1
        rewards[index] = rewards[index]+reward

    print("Probabilities for each arm: ", bandits, "(the smaller the better)")
    print("cum. reward for each arm  : ", rewards)
    print("pulls for each arm        : ", picks)

    print("------------------------ optimizing ------------------------")

    # We have the problem that we need to determine delta and lambda values previously.
    # This needs domain knowledge (in SR context, I think we need to know if data is homogenic or
    # if it changes a lot through time).
    optimizer = D_MAB(4, verbose=False, policy='max_ucb',
                      delta=0.15, lmbda=0.5, scaling=1, # Lambda seems to control how strong will be the exploitation
                      pull_f=pullBandit, reward_f=lambda r, **kwargs:r)

    # Let's optimize
    for i in range(10000):
        optimizer.playAndOptimize()

    total_rewards = {k : sum(v) for (k, v) in optimizer.history.items()}
    total_played  = {k : len(v) for (k, v) in optimizer.history.items()}

    print("cum. reward for each arm: ", total_rewards)
    print("pulls for each arm      : ", total_played)
    print(f"(it was expected: {expec})")


All bandits with same probs
------------- Uniformly Distributed Random pulls -------------
Probabilities for each arm:  [1. 1. 1. 1.] (the smaller the better)
cum. reward for each arm  :  [395.0, 395.0, 396.0, 368.0]
pulls for each arm        :  [2466, 2514, 2500, 2520]
------------------------ optimizing ------------------------
cum. reward for each arm:  {0: 414.0, 1: 412.0, 2: 384.0, 3: 398.0}
pulls for each arm      :  {0: 2625, 1: 2557, 2: 2406, 3: 2412}
(it was expected: similar amount of pulls for each arm)

One bandit with higher prob
------------- Uniformly Distributed Random pulls -------------
Probabilities for each arm:  [-1.   0.2  0.   1. ] (the smaller the better)
cum. reward for each arm  :  [2157.0, 1097.0, 1217.0, 384.0]
pulls for each arm        :  [2525, 2558, 2483, 2434]
------------------------ optimizing ------------------------
cum. reward for each arm:  {0: 3968.0, 1: 781.0, 2: 1031.0, 3: 202.0}
pulls for each arm      :  {0: 4760, 1: 1873, 2: 2076, 3: 1291}
(

In [3]:
# Simple test with verbose
bandits, descr, expec = (np.array([1.0, 1.0,  1.0,  1.0]), 'All bandits with same probs', 'similar amount of pulls for each arm')

def pullBandit(bandit, **kwargs):
    result = np.random.randn()
    return 1.0 if result > bandits[bandit] else 0.0

optimizer = D_MAB(4, verbose=True, policy='max_ucb',
                    delta=0.25, lmbda=10, scaling=1,
                    pull_f=pullBandit, reward_f=lambda r, **kwargs:r)

# Let's optimize
for i in range(100):
    optimizer.playAndOptimize()

total_rewards = {k : sum(v) for (k, v) in optimizer.history.items()}
total_played  = {k : len(v) for (k, v) in optimizer.history.items()}

print("cum. reward for each arm: ", total_rewards)
print("pulls for each arm      : ", total_played)
print(f"(it was expected: {expec})")

Avg. Rewards: [0. 0. 0. 0.]
UCB1 scores : [0.25 0.25 0.25 0.25]
Picked      : 0
Reward      : 1.0
Avg. Rewards: [1. 0. 0. 0.]
UCB1 scores : [1.83255461 1.17741002 1.17741002 1.17741002]
Picked      : 0
Reward      : 0.0
Avg. Rewards: [0.5 0.  0.  0. ]
UCB1 scores : [1.8558085  1.48230381 1.48230381 1.48230381]
Picked      : 0
Reward      : 0.0
Avg. Rewards: [0.33333333 0.         0.         0.        ]
UCB1 scores : [1.83255461 1.66510922 1.66510922 1.66510922]
Picked      : 0
Reward      : 0.0
Avg. Rewards: [0.25 0.   0.   0.  ]
UCB1 scores : [1.80235601 1.79412258 1.79412258 1.79412258]
Picked      : 0
Reward      : 0.0
Avg. Rewards: [0.2 0.  0.  0. ]
UCB1 scores : [1.77282156 1.89301847 1.89301847 1.89301847]
Picked      : 1
Reward      : 0.0
Avg. Rewards: [0.2 0.  0.  0. ]
UCB1 scores : [1.80537986 1.39495883 1.9727697  1.9727697 ]
Picked      : 2
Reward      : 0.0
Avg. Rewards: [0.2 0.  0.  0. ]
UCB1 scores : [1.83255461 1.44202689 1.44202689 2.03933398]
Picked      : 3
Reward    

Ok, so the D-MAB seems to work. Now let's add this MAB inside mutation to update PARAMS option and control dinamically the mutaiton probabilities during evolution.

We can import the brush estimator and replace the `_mutation` by a custom function. Ideally, to use this python MAB optimizer, we need to have an object created to keep track of the variables, and the object needs to wrap the _pull_ action, as well as evaluating the reward based on the result.

> we'll need to do a _gambiarra_ to know which mutation is used so we can correctly update `D_MAB`. All MAB logic is implemented in python, and we chose the mutation in python as well. To make sure a specific mutation was used, we force it to happen by setting others' weights to zero. this way we know exactly what happened in the C++ code

In [4]:
from brush import BrushRegressor
from deap import creator
import _brush
from deap_api import nsga2, DeapIndividual 

#prg.mutate is a convenient interface that uses the current search space to sample mutations

class BrushRegressorMod(BrushRegressor):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def _mutate(self, ind1):
        # Overriding the mutation so it is wrapped with D_MAB
        
        mutation, offspring, reward = self.D_MAB_.playAndOptimize(ind1=ind1)
        
        #print(mutation, ind1.prg.get_model(), offspring.prg.get_model(), reward)
        return offspring
    
    def fit(self, X, y):

        _brush.set_params(self.get_params())

        self.data_ = self._make_data(X,y)

        # Creating a wrapper for mutation to be able to control what is happening in the C++
        # code (this should be prettier in a future implementation)
        def _pull_mutation(mutation_idx, ind1, **kwargs):
            mutations = ['point', 'insert', 'delete', 'toggle_weight']
            params = self.get_params()

            for i, m in enumerate(mutations):
                params['mutation_options'][m] = 0 if i != mutation_idx else 1.0

            _brush.set_params(params)
        
            offspring = creator.Individual(ind1.prg.mutate())

            return offspring
        
        # Given the result of a pull (the mutated offspring), how do I evaluate it?
        # I need to return if the reward was positive or negative. We make use of
        # kwargs here.
        # (here I am manually writing the multi-optimization problem nsga2 is
        # designed to solve)
        def _evaluate_reward(pulled, ind1, **kwargs):
            if True: #not ind1.fitness.valid:
                ind1.prg.fit(self.data_)
                fit = (
                    np.sum((self.data_.y- ind1.prg.predict(self.data_))**2),
                    ind1.prg.size()
                )
            
                ind1.fitness.setValues(fit)
                # ind1.fitness = fit

            # in deap, a negative weight means a minimization problem, while a 
            # positive weight is a maximization problem.

            # ind1_error, ind1_size = ind1.fitness.values
            # ind1_fitness = -1.0*ind1_error + -1.0*ind1_size

            if True: #not pulled.fitness.valid:
                pulled.prg.fit(self.data_)
                fit = (
                    np.sum((self.data_.y- pulled.prg.predict(self.data_))**2),
                    pulled.prg.size()
                )
            
                pulled.fitness.setValues(fit)
                # pulled.fitness = fit
            
            # pulled_error, pulled_size = pulled.fitness.values
            # pulled_fitness = -1.0*pulled_error + -1.0*pulled_size

            # We compare fitnesses using the deap overloaded operators
            # from the docs: When comparing fitness values that are **minimized**, ``a > b`` will
            # return :data:`True` if *a* is **smaller** than *b*.
            return 1.0 if pulled.fitness.dominates(ind1.fitness) else 0.0

            # return 0.0 if pulled.fitness.values <= ind1.fitness.values else 1.0
            
        # We have 4 different mutations
        self.D_MAB_ = D_MAB(4, verbose=False, policy='max_ucb', 
                            delta=0.15, lmbda=5, scaling=1,
                            pull_f=_pull_mutation, reward_f=_evaluate_reward)

        if isinstance(self.functions, list):
            self.functions_ = {k:1.0 for k in self.functions}
        else:
            self.functions_ = self.functions

        self.search_space_ = _brush.SearchSpace(self.data_, self.functions_)
        self.toolbox_ = self._setup_toolbox(data=self.data_)

        archive, logbook = nsga2(self.toolbox_, self.max_gen, self.pop_size, 0.9, self.verbosity)

        self.archive_ = archive
        self.best_estimator_ = self.archive_[0].prg
        total_played  = {k : len(v) for (k, v) in self.D_MAB_.history.items()}

        print(total_played)
        print(self.D_MAB_.avg_reward)
        print('best model:',self.best_estimator_.get_model())
        return self


Finally, lets use this new mutation into an ES algorithm (because this is only based on mutation) and see if it improves the performance

In [5]:
import pandas as pd

# I am getting tons of unharmful warnings
import warnings
warnings.filterwarnings("ignore")

# df = pd.read_csv('../../docs/examples/datasets/d_enc.csv')
# X = df.drop(columns='label')
# y = df['label']

df = pd.read_csv('../../docs/examples/datasets/d_2x1_subtract_3x2.csv')
X = df.drop(columns='target')
y = df['target']

# df = pd.read_csv('../../docs/examples/datasets/d_square_x1_plus_2_x1_x2_plus_square_x2.csv')
# X = df.drop(columns='target')
# y = df['target']

kwargs = {
    'pop_size'  : 160,
    'max_gen'   : 160,
    'verbosity' : 0,
    'max_depth' : 10,
    'max_size'  : 20,
    'mutation_options' : {"point":0.25, "insert": 0.25, "delete":  0.25, "toggle_weight": 0.25}
}

In [6]:

# 30 executions just to compare avg score
scores = []
for i in range(30):
    print(f"-------------------------------------- Run {i} --------------------------------------")
    est_mab = BrushRegressorMod(**kwargs)

    # use like you would a sklearn regressor
    est_mab.fit(X,y)
    y_pred = est_mab.predict(X)

    scores.append(est_mab.score(X,y))
    print('score:', scores[-1])
print(f"Score mean (30 runs): {np.mean(scores)}")
print(f"Score std (30 runs) : {np.std(scores)}")

# Single run with verbosity
kwargs['verbosity'] = 1
est_mab = BrushRegressorMod(**kwargs)

# use like you would a sklearn regressor
est_mab.fit(X,y)
y_pred = est_mab.predict(X)

print('score:', est_mab.score(X,y))

-------------------------------------- Run 0 --------------------------------------
{0: 2171, 1: 478, 2: 21518, 3: 1273}
[0.0105942  0.00627615 0.01319825 0.00942655]
best model: -4.24*x2
score: 0.651326594086648
-------------------------------------- Run 1 --------------------------------------
{0: 2849, 1: 1088, 2: 19959, 3: 1544}
[0.00807301 0.00643382 0.00971993 0.00712435]
best model: 3.68*Sin(2.74*x1)
score: 0.8675268611694
-------------------------------------- Run 2 --------------------------------------
{0: 1899, 1: 1038, 2: 18860, 3: 3643}
[0.00684571 0.00578035 0.00890774 0.00768597]
best model: Sub(-3.00*x2,-2.00*x1)
score: 0.999999999999994
-------------------------------------- Run 3 --------------------------------------
{0: 22499, 1: 198, 2: 2073, 3: 670}
[0.00634495 0.         0.00455322 0.00350263]
best model: If(x2>-0.46,-4.09*x2,3.18)
score: 0.6802750800991533
-------------------------------------- Run 4 --------------------------------------
{0: 2204, 1: 1989, 2: 1

Comparing with the original implementation

In [7]:
# 30 executions just to compare avg score

kwargs['verbosity'] = 0

scores = []
for i in range(30):

    print(f"-------------------------------------- Run {i} --------------------------------------")
    est_mab = BrushRegressor(**kwargs)

    # use like you would a sklearn regressor
    est_mab.fit(X,y)
    y_pred = est_mab.predict(X)

    scores.append(est_mab.score(X,y))
    print('score:', scores[-1])
print(f"Score mean (30 runs): {np.mean(scores)}")
print(f"Score std (30 runs) : {np.std(scores)}")

# Single run with verbosity
kwargs['verbosity'] = 1
est = BrushRegressor(**kwargs)

# use like you would a sklearn regressor
est.fit(X,y)
y_pred = est.predict(X)

print('score:', est.score(X,y))

-------------------------------------- Run 0 --------------------------------------
best model: Floor(-2.98*x2)
score: 0.6624346086007631
-------------------------------------- Run 1 --------------------------------------
best model: -4.24*x2
score: 0.651326594086648
-------------------------------------- Run 2 --------------------------------------
best model: -4.24*x2
score: 0.651326594086648
-------------------------------------- Run 3 --------------------------------------
best model: -4.24*x2
score: 0.651326594086648
-------------------------------------- Run 4 --------------------------------------
best model: -4.24*x2
score: 0.651326594086648
-------------------------------------- Run 5 --------------------------------------
best model: -4.24*x2
score: 0.651326594086648
-------------------------------------- Run 6 --------------------------------------
best model: -4.24*x2
score: 0.651326594086648
-------------------------------------- Run 7 -------------------------------------