# Black Box Optimization

### Data Wrangling

#### Import

###### Generalities

In [1]:
import os
import numpy as np
import pandas as pd
from pathlib import Path
#import ast

In [2]:
# Get the parent directory of the current notebook's directory
notebook_dir = os.getcwd()
parent_dir = os.path.dirname(notebook_dir)

# Join the parent directory with "initial data"
data_path = os.path.join(parent_dir, "data")

###### Initial data

In [3]:
# import initial data

impInputInit = []

for i in range(1,9):
    pth = os.path.join(data_path, "initial_data","function_"+str(i),"initial_inputs.npy")
    impInputInit.append(np.load(pth, allow_pickle=True))

impOutputInit = []

for i in range(1,9):
    pth = os.path.join(data_path, "initial_data","function_"+str(i),"initial_outputs.npy")
    impOutputInit.append(np.load(pth, allow_pickle=True))

###### Weekly data

In [4]:
# function for transforming txt strings into np arrays

def imp_txt_to_np_array(file_path,type="input"):
    if type=="input":
        dlt="), array("
        srt=7
    elif type=="output":
        dlt="), np.float64("
        srt=12
    with open(file_path) as f:
        imp = f.read()
        imp = imp.split("]\n[")
        imp=imp[-1]
        imp = imp[srt:-2].split(dlt)
        #print(imp) if type=="output" else None

    impn = []
    for i in range(len(imp)):
        if type=="input":
            imp[i]=imp[i][1:-1].split(",")
            for j in range(len(imp[i])):
                imp[i][j]=float(imp[i][j])
            impn.append(np.array(imp[i]))
        elif type=="output":
            impn.append(float(imp[i]))
    return impn

In [6]:
# import new weekly data

week_now = 3

impInputNew=[]


for j in range(1, week_now +1):
    pth=os.path.join(data_path,"incremental_data", "week_"+str(j),"inputs.txt")
    impInputNew.append(imp_txt_to_np_array(pth))


new=[[] for _ in range(len(impInputNew[0]))]

for i in range(len(impInputNew[0])):
    for n in range(week_now):
        new[i].append(impInputNew[n][i])

impInputIncr=[None for i in range(8)]


for i in range(len(new)):
    #print(len(impInputInit[i])) #check
    impInputIncr[i]=impInputInit[i]

for nw_idx, nw in enumerate(impInputNew):
    for i in range(len(new)):
        impInputIncrn = impInputIncr[i]
        impInputIncrn = np.append( impInputIncrn,[np.asarray(new[i][nw_idx])] , axis=0)

        impInputIncr[i] = impInputIncrn

#check
#print("\n")
#for i in range(len(new)):
#    print(len(impInputIncr[i]))
#    print(len(impInputIncr[i][-1]))



impOutputNew=[]


for j in range(1, week_now +1):
    pth=os.path.join(data_path,"incremental_data", "week_"+str(j),"outputs.txt")
    impOutputNew.append(imp_txt_to_np_array(pth,type="output"))


new=[[] for _ in range(len(impOutputNew[0]))]

for i in range(len(impOutputNew[0])):
    for n in range(week_now):
        new[i].append(impOutputNew[n][i])

impOutputIncr=[None for i in range(8)]


for i in range(len(new)):
    #print(len(impOutputInit[i])) #check
    impOutputIncr[i]=impOutputInit[i]

for nw_idx, nw in enumerate(impOutputNew):
    for i in range(len(new)):
        impOutputIncrn = impOutputIncr[i]
        impOutputIncrn = np.append( impOutputIncrn,[np.asarray(new[i][nw_idx])] , axis=0)

        impOutputIncr[i] = impOutputIncrn

#check
#print("\n")
#for i in range(len(new)):
#    print(len(impOutputIncr[i]))
#    print(len(impOutputIncr[i][0]))


#### Definition

In [63]:
#data

data=[]

for i in range(len(impInputIncr)):
    data.append(list(zip(impInputIncr[i], impOutputIncr[i])))

#print(data[1])

#print(list(map(lambda x: x[0], data[0])))

dimensions={0: 2, 1: 2, 2: 3, 3: 4, 4: 4, 5: 5, 6: 6, 7: 8}
positiveQ={0: False, 1: True, 2: False, 3: False, 4: True, 5: True, 6: True, 7: True}

In [65]:
#grid

def gridAll(k, sub=5):
    dim = dimensions[k]
    zero = np.zeros(dim) * sub
    one = np.ones(dim) * sub
    
    # Create a mesh grid within the space defined by (zero, one)
    grid_points = np.meshgrid(*[np.linspace(0, 1, sub+1) for _ in range(dim)])
    
    # Flatten the grid points and combine into a single array of coordinates
    grid_points_flat = np.array(grid_points).reshape(dim, -1).T
    return grid_points_flat

#print(gridAll(2, sub=2))  # Example usage for 3 dimensions with sub=2

## Training

### Gaussian Process Regressor

In [None]:
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF
#from sklearn.gaussian_process.kernels import ConstantKernel as C


In [28]:
def create_predictor(X, y):
    #kernel = C(1.0, (1e-3, 1e3)) * RBF(length_scale=1.0, length_scale_bounds=(1e-3, 1e2))# RBF()  # Radial basis function kernel
    kernel= RBF()
    gp = GaussianProcessRegressor(kernel=kernel, normalize_y=True, n_restarts_optimizer=10, alpha=1e-6)
    gp.fit(X, y)
    return gp


In [101]:
predictors=[]

for k in range(8):
    X = list(map(lambda x: x[0], data[k]))
    y = list(map(lambda x: x[1], data[k]))
    predictors.append(create_predictor(X, y))

# test GP predictor

#X_test = np.array([0.0,0.5])
#y_pred, sigma = predictors[1].predict([X_test], return_std=True)
#print(f"Predicted value: {y_pred[0]}, Standard Deviation: {sigma[0]}")

# evaluate GP predictor on the grid

def pred_eval_fun(k, sub):
    X_test = gridAll(k, sub)[3:-3]
    y_preds = []
    sigmas = []
    for x in X_test:
        y_pred, sigma = predictors[k].predict([np.array(x)], return_std=True)
        y_preds.append(y_pred[0])
        sigmas.append(sigma[0])
    return y_preds, np.array(sigmas)


# evaluate ucb acquisition function on the grid


def meanGrid(k,sub):
    return list(zip(gridAll(k,sub),pred_eval_fun(k,sub)[0]))

def stdGrid(k,sub):
    return list(zip(gridAll(k,sub),pred_eval_fun(k,sub)[-1]))

def beta(k,sub):
    p = pred_eval_fun(k,sub)
    m = p[0]
    s = p[-1]
    mm = abs(np.mean(m))
    ms = abs(np.mean(s))
    b=mm/ms
    return b


def ucbGrid(k,sub):
    p = pred_eval_fun(k,sub)
    pq = positiveQ[k]
    gr = gridAll(k,sub)
    b= beta(k,sub)
    # should take the negative of the mean when positiveQ is False
    if pq:
        m = p[0]
    else:
        m = np.array(list(map(lambda x: -x, p[0])))
    s = p[-1]
    return list(zip(gr,m+b*s))


# find the maximum point of the ucb acquisition function

print(max(ucbGrid(0,10), key=lambda item: item[1]))


print(max(ucbGrid(1,10), key=lambda item: item[1]))





(array([0., 0.]), 0.0003917558428567416)
(array([0.4, 0.9]), 0.6706035149374576)
