# Pain-Informed Bayes' Opt

In [None]:
from painModels import (
    instant_pain, relative_pain, add_relative_pain, memory_pain, 
    pain_recall, calculate_pains, MAX_PAIN
)
import numpy as np
import pandas as pd
import plotly.express as px

param_names = ["Amplitude (nA)", "Pulse Width (ms)"]
target_name = "Reported Pain"

from scipy.stats import norm
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error

## Refining Aquisiton

Goal: Pain-Informed acquisition
- generate a graph exploit(waveform_params) = uncertainty
downweight uncertainty by the following graphs:
- generate a graph min_inst_pain(waveform_params) = inst_pain_weight
- generate a graph min_rpd(last_waveform, waveform_params) = rpd_pain_weight
- generate a graph min_end(curr_trial_num, max_trial_num, inst_pain(waveform_params)) = memory_pain_weight
>Note: in the simplest case, waveform_params is a (float, float)=(amplitude, pulse width)

In [17]:
def ideal_pain_curve(trial_num, max_trial_num):  # helper for ideal pain curve
    """ -(4B/A^2) * x(x-A) :: A is the max#trials, B is the ideal height aka pain,
                                C is roughly where we want to start our experiment """
    x = trial_num
    height_mod = 2
    A = max_trial_num
    B = MAX_PAIN*0.9 - height_mod  # * Note: in ideal we only want 90% of our max pain?
    return (-4*B * x * (x-A)) / (A**2) + height_mod
# testing
data = []
max_trials = 40
for trial in np.linspace(0, max_trials, num=500, endpoint=True):
    noise = np.random.normal(0, 0.5, 1)[0]
    fx = ideal_pain_curve(trial, max_trials) + noise
    data.append((trial, fx))
df = pd.DataFrame(data, columns=["Trial #", "Ideal Pain"])
px.scatter(df, x="Trial #", y="Ideal Pain")

In [None]:
# should be known
max_params=[5, 50]
weights=[5,3]
# Implementation: 
def weight_ppain(params, last_params, trial_num, max_trial_num, w=[0.25, 0.2]):
    """ returns a weight to the uncertainty with a max deviating percentage of sum(w) 
    ie, at the highest pain possible, we are disencentivizing sampling by (sum(w)*100)%"""
    # inst pain
    ipain = instant_pain(params, max_params, weights)
    if len(last_params) == 0:  # first stim event => no baseline pain
        lpain = 0
    else:
        lpain = instant_pain(last_params, max_params, weights)
    # * Note: include_lower incentivizes larger decreases in pain-scale which may inadvertantly incentivize higher jumps
    # relative pain
    ppain = add_relative_pain([lpain, ipain], 2, include_lower=False)[-1]
    modifier = w[0] * ppain/sum(weights)  # Note: weights only account for ipain, not relative pain
    # memory of pain
    idpain = ideal_pain_curve(trial_num, max_trial_num)
    if ppain > idpain:
        modifier += w[1] * (abs(ppain - idpain)/MAX_PAIN)
    else:
        modifier += (w[1]/4) * (abs(ppain - idpain)/MAX_PAIN)
    return (1-modifier)

## Testing Optimization

In [None]:
# Ground truth objective function
def objective(x): 
    return -(np.sin(x) + 0.1 * x ** 2)

# HELPERS
def unnorm(pts, data):
    """ returns a list of points that is originally normalized with respect to array-like data """
    # norm(x) = x-min/(max-min)
    # norm_inv(x) = x(max-min)+min
    mi = min(data)
    ma = max(data)
    return np.array([(p*(ma-mi) + mi) for p in pts])

def my_norm(points, data):
    """ returns a list representative of points but normalized with respect to data """
    mi = min(data)
    ma = max(data)
    return [(x-mi)/(ma-mi) for x in points]

# vis helper
def plot_BO(x, gt, means, pts, it):
    df = pd.DataFrame([[x[i], means[i], gt[i]] for i in range(len(means))], columns=["X", "GP Predicted", "Ground-Truth"])
    fig = px.line(df, x="X", y=["GP Predicted", "Ground-Truth"], title=f"Bayes Opt's Prediction vs. Reality for Iteration {it}")
    fig.add_trace(px.scatter(x=pts.keys(), y=pts.values()).data[0])
    fig.show()

def sample_initial_points(num_init=10):
    """ Randomly sample 10 initial data points in the range of [-10, 10] """
    diff = high-low
    return np.random.rand(num_init)*diff - high

def acquisition_function(means, std, best, explore=0.01):
    """ Expected Improvement (EI) acquisition function """
    improv = [(m-best-explore) for m in means]  # elem wise subtract & divide
    z = improv / std  
    ei = improv * norm.cdf(z) * norm.pdf(z)
    best_ratio = np.argmax(ei)/(len(ei)-1)  # location-ratio of best new val
    new_best = best_ratio*(high-low) + low  # converting ratio to point on our scale
    return new_best

def BO_loop(opt_trials=10, add_graph=True, verbose=True):
    # Define the GP model
    kernel = RBF(length_scale=1.0, length_scale_bounds=(1e-2, 1e3))
    gp = GaussianProcessRegressor(kernel=kernel, optimizer=None) 

    starting_pts = sample_initial_points()
    eval_pts = dict([[pt, objective(pt)] for pt in starting_pts])
    acc_hist = []
    last_gt = []
    for it in range(opt_trials+1):
        # calculate latent
        y_pred = [] # my_norm(eval_pts.values())
        x_vals = []
        for key, val in eval_pts.items():
            y_pred = y_pred + my_norm([val], eval_pts.values())
            x_vals.append(key)
        gp.fit(np.array(x_vals).reshape(-1,1), y_pred)
        # define latent further
        x = np.arange(low, high, 1/10)
        m, s = gp.predict(x.reshape(-1, 1), return_std=True)
        means = unnorm(m, eval_pts.values())
        std = unnorm(s, eval_pts.values())
        # evaluation
        gt = [objective(pt) for pt in x]
        last_gt = [[x[i], gt[i]] for i in range(len(gt))]
        dev_perc = 100*abs(max(means)-max(gt))/(max(means)-min(means))  # %range of deviation from max's
        acc = [round(mean_squared_error(gt, means), 3), round(dev_perc, 3)]
        acc_hist.append(acc)
        if verbose:
            print("It", it, "-", len(eval_pts), "points with [MSE, deviation_perc] =", acc)
        if add_graph:
            plot_BO(x, gt, means, eval_pts, it)  # show the resulting graph
        # set up next iteration
        if it < opt_trials:
            # max([[k,v] for k, v in eval_pts.items()], key=lambda x: x[1])
            bp = max(list(eval_pts.values()))
            next_point = acquisition_function(means, std, bp)
            eval_pts[next_point] = objective(next_point)
            print("Adding point x =", next_point)
    return last_gt, acc_hist, eval_pts, gp
hist, accuracies, _, _ = BO_loop()