In [1]:
#from purecmaes import fmin
import torch
from torchvision.datasets import CIFAR10,MNIST
from torchvision.models import wide_resnet101_2, resnet18
from torch.utils.data import DataLoader
from torchvision.transforms import ToTensor, Lambda, Compose
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import accuracy_score

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
@torch.no_grad()
def extract_features(loader):
    model = resnet18(pretrained=True).to('cuda')
    outputs = []
    def hook(module, input, output):
        outputs.append(output)
    model.avgpool.register_forward_hook(hook)
    model.eval()
    labels = []
    features = []
    for data in loader:
        img, label = data
        labels.append(label.cpu().detach().numpy())
        _ = model(img.to('cuda'))
        features.append(outputs[0].cpu())
        outputs = []
    features = torch.cat(features,0).squeeze().numpy()
    labels = np.concatenate(labels)
    return features, labels

def get_data():
    trf = Compose([ToTensor(), Lambda(lambda x: x.repeat(3, 1, 1)) ])
    loader = DataLoader(MNIST(root='/content/dataset', download=True, train=True,transform=trf), batch_size=32, shuffle=False)
    Xtrain, ytrain = extract_features(loader)
    loader = DataLoader(MNIST(root='/content/dataset', download=True, train=False,transform=trf), batch_size=32, shuffle=False)
    Xtest, ytest = extract_features(loader)
    return Xtrain, ytrain, Xtest, ytest

class SGDTuning():
    def __init__(self):
        Xtrain, ytrain, self.Xtest, self.ytest = get_data()
        self.Xtrain,  self.Xval, self.ytrain, self.yval  = train_test_split(Xtrain, ytrain, test_size=0.33, random_state=42)
        
    def __call__(self, trial, params=[]):
        if trial != None:
            alpha = trial.suggest_float('learning_rate', 0, 1)
            eta0 = trial.suggest_float('batch_size', 0, 10e-5)
        else:
            alpha, eta0 = params
        alpha, eta0 =  np.abs(alpha), np.abs(eta0) 
        estimator = SGDClassifier(alpha=alpha,eta0=eta0, learning_rate='invscaling')
        estimator.fit(self.Xtrain, self.ytrain)
        pred = estimator.predict(self.Xval)
        acc = accuracy_score(self.yval, pred)
        return 100 * acc

    def test(self, params):
        alpha, eta0 = params
        alpha, eta0 =  np.abs(alpha), np.abs(eta0) 
        estimator = SGDClassifier(alpha=alpha,eta0=eta0, learning_rate='invscaling')

        estimator.fit(self.Xtrain,self.ytrain)
        pred = estimator.predict(self.Xtest)
        acc = accuracy_score(self.ytest, pred)
        return estimator, acc

# Search the best parameters with CMA-ES

In [None]:
func = SGDTuning()

In [5]:
import time
import optuna
import gym
import numpy as np

from gym import spaces
from bayes_opt import BayesianOptimization
from stable_baselines3 import SAC, PPO

In [6]:
class optimEnv(gym.Env):
    metadata = {"render.modes": []}

    def __init__(self, objective, render_mode=None, iterations=10, direction='maximize'):
        super().__init__()

        self.observation_space = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32)
        self.action_space = spaces.Box(low=np.array([0, 10e-5]), high=np.array([1, 1]), dtype=np.float32)

        self.objective = objective
        self.iterations = iterations
        self.run = []

    def _get_obs(self):
        call = self.objective(self._agent_location)
        self.run.append(call)
        return call

    def _get_info(self):
        return {"current_location" : self._agent_location}
    
    def reset(self):
        self.current_step = 0

        # Choose the agent's location uniformly at random
        self._agent_location = self.action_space.sample()
        self.last_observation = self._get_obs()
        
        self.max_observation = self.last_observation
        self.max_location = self._agent_location

        return self.last_observation

    def step(self, action):
        self._agent_location = action
        # An episode is done iff current_step == iterations

        observation = self._get_obs()
        info = self._get_info()

        self.current_step += 1
        terminated = self.current_step == self.iterations

        reward = observation - self.last_observation
        if (observation > self.max_observation) :
          self.max_observation = observation
          self.max_location = action

        print("Best : ", self.max_observation, " at ", self.max_location)

        return observation, reward, terminated, info

    def render(self, mode="human"):
        pass

    def close(self):
        pass

In [13]:
# make data (#runs, #calls)

# Optuna
def get_run_optuna(study):

    run = []
    trials = study.get_trials()
    for trial in trials:
        value = trial.value
        if value == None:
            run.append(run[-1])
        else:
            run.append(value)
    
    return run

# BO
def get_run_BO(optimizer):
    run = []
    for res in optimizer.res:
        run.append(res['target'])
    print("run : ", run)
    return run

# RL
def get_run_RL(env):
    return env.run
    

# PSO
from random import random
from random import uniform

#--- MAIN ---------------------------------------------------------------------+

class Particle:
    def __init__(self, x0):
        self.position_i=[]          # particle position
        self.velocity_i=[]          # particle velocity
        self.pos_best_i=[]          # best position individual
        self.err_best_i=-1          # best error individual
        self.err_i=-1               # error individual

        for i in range(0,num_dimensions):
            self.velocity_i.append(uniform(-1,1))
            self.position_i.append(x0[i])

    # evaluate current fitness
    def evaluate(self,costFunc):
        self.err_i=costFunc(self.position_i)

        # check to see if the current position is an individual best
        if self.err_i<self.err_best_i or self.err_best_i==-1:
            self.pos_best_i=self.position_i.copy()
            self.err_best_i=self.err_i
                    
    # update new particle velocity
    def update_velocity(self,pos_best_g):
        w=0.5       # constant inertia weight (how much to weigh the previous velocity)
        c1=1        # cognative constant
        c2=2        # social constant
        
        for i in range(0,num_dimensions):
            r1=random()
            r2=random()
            
            vel_cognitive=c1*r1*(self.pos_best_i[i]-self.position_i[i])
            vel_social=c2*r2*(pos_best_g[i]-self.position_i[i])
            self.velocity_i[i]=w*self.velocity_i[i]+vel_cognitive+vel_social

    # update the particle position based off new velocity updates
    def update_position(self,bounds):
        for i in range(0,num_dimensions):
            self.position_i[i]=self.position_i[i]+self.velocity_i[i]
            
            # adjust maximum position if necessary
            if self.position_i[i]>bounds[i][1]:
                self.position_i[i]=bounds[i][1]

            # adjust minimum position if neseccary
            if self.position_i[i]<bounds[i][0]:
                self.position_i[i]=bounds[i][0]

def pso_run(costFunc, x0, bounds, num_particles, maxiter, verbose=False):
    global num_dimensions

    num_dimensions=len(x0)
    err_best_g=-1                   # best error for group
    pos_best_g=[]                   # best position for group

    # establish the swarm
    swarm=[]
    for i in range(0,num_particles):
        swarm.append(Particle(x0))

    run = []

    # begin optimization loop
    i=0
    while i<maxiter:
        if verbose: print(f'iter: {i:>4d}, best solution: {err_best_g:10.6f}')
            
        # cycle through particles in swarm and evaluate fitness
        for j in range(0,num_particles):
            swarm[j].evaluate(costFunc)

            run.append(-float(swarm[j].err_i))
            print("run", run)

            # determine if current particle is the best (globally)
            if swarm[j].err_i<err_best_g or err_best_g==-1:
                pos_best_g=list(swarm[j].position_i)
                err_best_g=float(swarm[j].err_i)
        
        # cycle through swarm and update velocities and position
        for j in range(0,num_particles):
            swarm[j].update_velocity(pos_best_g)
            swarm[j].update_position(bounds)
        i+=1

    # print final results
    if verbose:
        print('\nFINAL SOLUTION:')
        print(f'   > {pos_best_g}')
        print(f'   > {err_best_g}\n')

    return run

In [26]:
def benchmark_optuna(n_runs, objective, sampler, name_sampler, n_calls=50) : 
    runs = []

    for _ in range(n_runs) :
        study = optuna.create_study(direction="maximize", sampler=sampler)
        study.optimize(lambda t : objective(t), n_trials=n_calls, timeout=None)
        run = get_run_optuna(study)
        runs.append(run)
    
    with open(f"{name_sampler}_runs{n_runs}_calls{n_calls}_PB1", 'wb') as f:
        np.save(f, np.array(runs))
   
    return np.array(runs)

def benchmark_bo(n_runs, objective, n_calls=50):
    runs = []
    pbounds={'x':(0,1),'y':(10e-5,1)}
    for _ in range(n_runs) :
        optimizer = BayesianOptimization(f=lambda x, y : objective(None, [x, y]), pbounds=pbounds, random_state=int(time.time()))
        optimizer.maximize(init_points=5, n_iter=45)
        run = get_run_BO(optimizer)
        runs.append(run)

    with open(f"BO_runs{n_runs}_calls{n_calls}_PB1", 'wb') as f:
        np.save(f, np.array(runs))

    return np.array(runs)

def benchmark_pso(n_runs, objective, n_calls=50):
    runs = []
    initial=[np.random.random(), np.random.random()]
    bounds=[(0,1),(10e-5,1)]
    for _ in range(n_runs) :
        run = pso_run(lambda x : -objective(None, x), initial, bounds, 4, 8, False)
        runs.append(run)

    with open(f"PSO_runs{n_runs}_calls{n_calls}_PB1", 'wb') as f:
        np.save(f, np.array(runs))

    return np.array(runs)

def benchmark_rl(n_runs, objective, n_calls=50):
    runs = []
    for _ in range(n_runs) :
        env = optimEnv(lambda x : objective(None, x), iterations=4)
        model = PPO("MlpPolicy", env, verbose=1)
        model.learn(total_timesteps=40, log_interval=4)
        run = get_run_RL(env)
        runs.append(run)

    with open(f"RL_runs{n_runs}_calls{n_calls}_PB1", 'wb') as f:
        np.save(f, np.array(runs))

    return np.array(runs)

In [None]:
runs_RL = benchmark_rl(10, func)