Skip to content

Commit

Permalink
Add automation support including hyper-parameters optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
allegroai committed May 22, 2020
1 parent b457b9a commit 95105cb
Show file tree
Hide file tree
Showing 16 changed files with 1,921 additions and 94 deletions.
2 changes: 1 addition & 1 deletion examples/automl/toy_base_task.py
Expand Up @@ -16,4 +16,4 @@
task.connect(params)

# Print the value to demonstrate it is the value is set by the initiating task.
print ("Example_Param is", params['Example_Param'])
print("Example_Param is", params['Example_Param'])
1 change: 1 addition & 0 deletions examples/manual_reporting.py
Expand Up @@ -75,6 +75,7 @@
},
index=['falcon', 'dog', 'spider', 'fish']
)
df.index.name = 'id'
logger.report_table("test table pd", "PD with index", 1, table_plot=df)

# Report table - CSV from path
Expand Down
3 changes: 3 additions & 0 deletions trains/automation/__init__.py
@@ -0,0 +1,3 @@
from .parameters import UniformParameterRange, DiscreteParameterRange, UniformIntegerParameterRange, ParameterSet
from .optimization import GridSearch, RandomSearch, HyperParameterOptimizer, Objective
from .job import TrainsJob
1 change: 1 addition & 0 deletions trains/automation/hpbandster/__init__.py
@@ -0,0 +1 @@
from .bandster import OptimizerBOHB
240 changes: 240 additions & 0 deletions trains/automation/hpbandster/bandster.py
@@ -0,0 +1,240 @@
from time import sleep, time
from ..parameters import DiscreteParameterRange, UniformParameterRange, RandomSeed, UniformIntegerParameterRange
from ..optimization import Objective, SearchStrategy
from ...task import Task

try:
from hpbandster.core.worker import Worker
from hpbandster.optimizers import BOHB
import hpbandster.core.nameserver as hpns
import ConfigSpace as CS
import ConfigSpace.hyperparameters as CSH
Task.add_requirements('hpbandster')
except ImportError:
raise ValueError("OptimizerBOHB requires 'hpbandster' package, it was not found\n"
"install with: pip install hpbandster")


class TrainsBandsterWorker(Worker):
def __init__(self, *args, optimizer, base_task_id, queue_name, objective,
sleep_interval=0, budget_iteration_scale=1., **kwargs):
super(TrainsBandsterWorker, self).__init__(*args, **kwargs)
self.optimizer = optimizer
self.base_task_id = base_task_id
self.queue_name = queue_name
self.objective = objective
self.sleep_interval = sleep_interval
self.budget_iteration_scale = budget_iteration_scale
self._current_job = None

def compute(self, config, budget, **kwargs):
"""
Simple example for a compute function
The loss is just a the config + some noise (that decreases with the budget)
For dramatization, the function can sleep for a given interval to emphasizes
the speed ups achievable with parallel workers.
Args:
config: dictionary containing the sampled configurations by the optimizer
budget: (float) amount of time/epochs/etc. the model can use to train.
We assume budget is iteration, as time might not be stable from machine to machine.
Returns:
dictionary with mandatory fields:
'loss' (scalar)
'info' (dict)
"""
self._current_job = self.optimizer.helper_create_job(self.base_task_id, parameter_override=config)
self.optimizer._current_jobs.append(self._current_job)
self._current_job.launch(self.queue_name)
iteration_value = None
while not self._current_job.is_stopped():
iteration_value = self.optimizer._objective_metric.get_current_raw_objective(self._current_job)
if iteration_value and iteration_value[0] >= self.budget_iteration_scale * budget:
self._current_job.abort()
break
sleep(self.sleep_interval)

result = {
# this is the a mandatory field to run hyperband
# remember: HpBandSter always minimizes!
'loss': float(self.objective.get_normalized_objective(self._current_job)*-1.),
# can be used for any user-defined information - also mandatory
'info': self._current_job.task_id()
}
print('TrainsBandsterWorker result {}, iteration {}'.format(result, iteration_value))
self.optimizer._current_jobs.remove(self._current_job)
return result


class OptimizerBOHB(SearchStrategy, RandomSeed):
def __init__(self, base_task_id, hyper_parameters, objective_metric,
execution_queue, num_concurrent_workers, min_iteration_per_job, max_iteration_per_job, total_max_jobs,
pool_period_min=2.0, max_job_execution_minutes=None, **bohb_kargs):
"""
Initialize a search strategy optimizer
:param str base_task_id: Task ID (str)
:param list hyper_parameters: list of Parameter objects to optimize over
:param Objective objective_metric: Objective metric to maximize / minimize
:param str execution_queue: execution queue to use for launching Tasks (experiments).
:param int num_concurrent_workers: Limit number of concurrent running machines
:param float min_iteration_per_job: minimum number of iterations for a job to run.
:param int max_iteration_per_job: number of iteration per job
:param int total_max_jobs: total maximum job for the optimization process. Default None, unlimited
:param float pool_period_min: time in minutes between two consecutive pools
:param float max_job_execution_minutes: maximum time per single job in minutes, if exceeded job is aborted
:param ** bohb_kargs: arguments passed directly yo the BOHB object
"""
super(OptimizerBOHB, self).__init__(
base_task_id=base_task_id, hyper_parameters=hyper_parameters, objective_metric=objective_metric,
execution_queue=execution_queue, num_concurrent_workers=num_concurrent_workers,
pool_period_min=pool_period_min, max_job_execution_minutes=max_job_execution_minutes,
total_max_jobs=total_max_jobs)
self._max_iteration_per_job = max_iteration_per_job
self._min_iteration_per_job = min_iteration_per_job
self._bohb_kwargs = bohb_kargs or {}
self._param_iterator = None
self._namespace = None
self._bohb = None
self._res = None

def set_optimization_args(self, eta=3, min_budget=None, max_budget=None,
min_points_in_model=None, top_n_percent=15,
num_samples=None, random_fraction=1/3., bandwidth_factor=3,
min_bandwidth=1e-3):
"""
Defaults copied from BOHB constructor, see details in BOHB.__init__
BOHB performs robust and efficient hyperparameter optimization
at scale by combining the speed of Hyperband searches with the
guidance and guarantees of convergence of Bayesian
Optimization. Instead of sampling new configurations at random,
BOHB uses kernel density estimators to select promising candidates.
.. highlight:: none
For reference: ::
@InProceedings{falkner-icml-18,
title = {{BOHB}: Robust and Efficient Hyperparameter Optimization at Scale},
author = {Falkner, Stefan and Klein, Aaron and Hutter, Frank},
booktitle = {Proceedings of the 35th International Conference on Machine Learning},
pages = {1436--1445},
year = {2018},
}
Parameters
----------
eta : float (3)
In each iteration, a complete run of sequential halving is executed. In it,
after evaluating each configuration on the same subset size, only a fraction of
1/eta of them 'advances' to the next round.
Must be greater or equal to 2.
min_budget : float (0.01)
The smallest budget to consider. Needs to be positive!
max_budget : float (1)
The largest budget to consider. Needs to be larger than min_budget!
The budgets will be geometrically distributed
:math:`a^2 + b^2 = c^2 \sim \eta^k` for :math:`k\in [0, 1, ... , num\_subsets - 1]`.
min_points_in_model: int (None)
number of observations to start building a KDE. Default 'None' means
dim+1, the bare minimum.
top_n_percent: int (15)
percentage ( between 1 and 99, default 15) of the observations that are considered good.
num_samples: int (64)
number of samples to optimize EI (default 64)
random_fraction: float (1/3.)
fraction of purely random configurations that are sampled from the
prior without the model.
bandwidth_factor: float (3.)
to encourage diversity, the points proposed to optimize EI, are sampled
from a 'widened' KDE where the bandwidth is multiplied by this factor (default: 3)
min_bandwidth: float (1e-3)
to keep diversity, even when all (good) samples have the same value for one of the parameters,
a minimum bandwidth (Default: 1e-3) is used instead of zero.
"""
if min_budget:
self._bohb_kwargs['min_budget'] = min_budget
if max_budget:
self._bohb_kwargs['max_budget'] = max_budget
if num_samples:
self._bohb_kwargs['num_samples'] = num_samples
self._bohb_kwargs['eta'] = eta
self._bohb_kwargs['min_points_in_model'] = min_points_in_model
self._bohb_kwargs['top_n_percent'] = top_n_percent
self._bohb_kwargs['random_fraction'] = random_fraction
self._bohb_kwargs['bandwidth_factor'] = bandwidth_factor
self._bohb_kwargs['min_bandwidth'] = min_bandwidth

def start(self):
# Step 1: Start a nameserver
fake_run_id = 'OptimizerBOHB_{}'.format(time())
self._namespace = hpns.NameServer(run_id=fake_run_id, host='127.0.0.1', port=None)
self._namespace.start()

# we have to scale the budget to the iterations per job, otherwise numbers might be too high
budget_iteration_scale = self._max_iteration_per_job

# Step 2: Start the workers
workers = []
for i in range(self._num_concurrent_workers):
w = TrainsBandsterWorker(optimizer=self,
sleep_interval=int(self.pool_period_minutes*60),
budget_iteration_scale=budget_iteration_scale,
base_task_id=self._base_task_id,
objective=self._objective_metric,
queue_name=self._execution_queue,
nameserver='127.0.0.1', run_id=fake_run_id, id=i)
w.run(background=True)
workers.append(w)

# Step 3: Run an optimizer
self._bohb = BOHB(configspace=self.convert_hyper_parameters_to_cs(),
run_id=fake_run_id,
num_samples=self.total_max_jobs,
min_budget=float(self._min_iteration_per_job)/float(self._max_iteration_per_job),
**self._bohb_kwargs)
self._res = self._bohb.run(n_iterations=self.total_max_jobs, min_n_workers=self._num_concurrent_workers)

# Step 4: if we get here, Shutdown
self.stop()

def stop(self):
# After the optimizer run, we must shutdown the master and the nameserver.
self._bohb.shutdown(shutdown_workers=True)
self._namespace.shutdown()

if not self._res:
return

# Step 5: Analysis
id2config = self._res.get_id2config_mapping()
incumbent = self._res.get_incumbent_id()
all_runs = self._res.get_all_runs()

# Step 6: Print Analysis
print('Best found configuration:', id2config[incumbent]['config'])
print('A total of {} unique configurations where sampled.'.format(len(id2config.keys())))
print('A total of {} runs where executed.'.format(len(self._res.get_all_runs())))
print('Total budget corresponds to {:.1f} full function evaluations.'.format(
sum([r.budget for r in all_runs]) / self._bohb_kwargs.get('max_budget', 1.0)))
print('Total budget corresponds to {:.1f} full function evaluations.'.format(
sum([r.budget for r in all_runs]) / self._bohb_kwargs.get('max_budget', 1.0)))
print('The run took {:.1f} seconds to complete.'.format(
all_runs[-1].time_stamps['finished'] - all_runs[0].time_stamps['started']))

def convert_hyper_parameters_to_cs(self):
cs = CS.ConfigurationSpace(seed=self._seed)
for p in self._hyper_parameters:
if isinstance(p, UniformParameterRange):
hp = CSH.UniformFloatHyperparameter(
p.name, lower=p.min_value, upper=p.max_value, log=False, q=p.step_size)
elif isinstance(p, UniformIntegerParameterRange):
hp = CSH.UniformIntegerHyperparameter(
p.name, lower=p.min_value, upper=p.max_value, log=False, q=p.step_size)
elif isinstance(p, DiscreteParameterRange):
hp = CSH.CategoricalHyperparameter(p.name, choices=p.values)
else:
raise ValueError("HyperParameter type {} not supported yet with OptimizerBOHB".format(type(p)))
cs.add_hyperparameter(hp)

return cs

0 comments on commit 95105cb

Please sign in to comment.