Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continue an optimization run. #6

Open
wants to merge 4 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions dehb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
from loguru import logger
import sys

logger.configure(handlers=[{"sink": sys.stdout, "level": "INFO"}])
Comment on lines +1 to +4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does bringing this here help? or does it clash with the configuration defined in optimizers/dehb.py?
can logger now be imported directly? or could you point me to how you're using this maybe?



from .optimizers import DE, AsyncDE
from .optimizers import DEHB
from .utils import SHBracketManager
1 change: 1 addition & 0 deletions dehb/__version__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = '0.0.1'
106 changes: 97 additions & 9 deletions dehb/optimizers/dehb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
from copy import deepcopy
from loguru import logger
from distributed import Client
import pickle
import json
from typing import Dict, Union
from pathlib import Path

from dehb.optimizers import DE, AsyncDE
from dehb.utils import SHBracketManager
from dehb import AsyncDE, SHBracketManager


logger.configure(handlers=[{"sink": sys.stdout, "level": "INFO"}])
Expand Down Expand Up @@ -158,7 +161,10 @@ class DEHB(DEHBBase):
def __init__(self, cs=None, f=None, dimensions=None, mutation_factor=0.5,
crossover_prob=0.5, strategy='rand1_bin', min_budget=None,
max_budget=None, eta=3, min_clip=None, max_clip=None, configspace=True,
boundary_fix_type='random', max_age=np.inf, n_workers=None, client=None, **kwargs):
boundary_fix_type='random', max_age=np.inf, n_workers=None, client=None,
client_resources: Union[Dict, None] = None,
checkpoint_file: Union[Path, None] = None, restore_checkpoint: bool = False,
**kwargs):
super().__init__(cs=cs, f=f, dimensions=dimensions, mutation_factor=mutation_factor,
crossover_prob=crossover_prob, strategy=strategy, min_budget=min_budget,
max_budget=max_budget, eta=eta, min_clip=min_clip, max_clip=max_clip,
Expand Down Expand Up @@ -199,6 +205,11 @@ def __init__(self, cs=None, f=None, dimensions=None, mutation_factor=0.5,
self.gpu_usage = None
self.single_node_with_gpus = None

self.client_resources = client_resources
self.checkpoint_file = checkpoint_file
if restore_checkpoint:
self.restore_checkpoint()

def __getstate__(self):
""" Allows the object to picklable while having Dask client as a class attribute.
"""
Expand Down Expand Up @@ -564,9 +575,11 @@ def submit_job(self, job_info, **kwargs):
if self.single_node_with_gpus:
# managing GPU allocation for the job to be submitted
job_info.update({"gpu_devices": self._get_gpu_id_with_low_load()})
self.futures.append(
self.client.submit(self._f_objective, job_info)
)

if isinstance(self.client_resources, Dict) and len(self.client_resources) > 0:
self.futures.append(self.client.submit(self._f_objective, job_info, resources=self.client_resources))
else:
self.futures.append(self.client.submit(self._f_objective, job_info))
else:
# skipping scheduling to Dask worker to avoid added overheads in the synchronous case
self.futures.append(self._f_objective(job_info))
Expand Down Expand Up @@ -625,10 +638,19 @@ def _fetch_results_from_workers(self):
)
# book-keeping
self._update_trackers(
traj=self.inc_score, runtime=cost, history=(
config.tolist(), float(fitness), float(cost), float(budget), info
)
traj=self.inc_score,
runtime=cost,
history=(bracket_id, config.tolist(), float(fitness), float(cost), float(budget), info)
)

new_result = json.dumps([
self.iteration_counter, bracket_id,
{'configuration': config.tolist(), 'fidelity': float(budget),
'fitness': float(fitness), 'cost': float(cost), 'info': info}], sort_keys=True
)
with open(Path(self.output_path) / 'results.json', 'a+') as fh:
fh.write(new_result)

# remove processed future
self.futures = np.delete(self.futures, [i for i, _ in done_list]).tolist()

Expand Down Expand Up @@ -706,6 +728,72 @@ def _verbosity_runtime(self, fevals, brackets, total_cost):
self.logger.info(
"{}/{} {}".format(remaining[0], remaining[1], remaining[2])
)
def save_checkpoint(self, checkpoint_file: Union[str, Path, None] = None):

assert checkpoint_file is not None or self.checkpoint_file is not None
checkpoint_file = checkpoint_file or self.checkpoint_file
checkpoint_file = Path(checkpoint_file)

checkpoint = {'de': {budget: [value.population, value.fitness, value.parent_counter]
for budget, value in self.de.items()},
'state': {'iteration_counter': self.iteration_counter,
'inc_score': self.inc_score,
'inc_config': self.inc_config,
'inc_info': self.inc_info,
'start_at': self.start,
'save_at': time.time(),
},
'trajectory': self.traj,
'history': self.history,
'run_time': self.runtime,
}

with checkpoint_file.open('wb') as fh:
pickle.dump(checkpoint, fh)

def load_checkpoint(self, checkpoint_file: Path):
with checkpoint_file.open('rb') as fh:
checkpoint = pickle.load(fh)
return checkpoint

def restore_checkpoint(self):
assert self.checkpoint_file is not None
checkpoint = self.load_checkpoint(self.checkpoint_file)

for budget, values in checkpoint['de'].items():
self.de[budget].population = values[0]
self.de[budget].fitness = values[1]
self.de[budget].parent_counter = values[2]

self.inc_score = checkpoint['state']['inc_score']
self.inc_config = checkpoint['state']['inc_config']
self.inc_info = checkpoint['state']['inc_info']
self.start = time.time() - (checkpoint['state']['save_at'] - checkpoint['state']['start_at'])
self.traj = checkpoint['trajectory']
self.history = checkpoint['history']
self.run_time = checkpoint['run_time']

already_seen_brackets = set()
for run in checkpoint['history']:
bracket_id, config, fitness, cost, budget, info = run
bracket = None

if bracket_id not in already_seen_brackets:
bracket = self._start_new_bracket()
already_seen_brackets.add(bracket_id)
else:
for cand_bracket in self.active_brackets:
if cand_bracket.bracket_id == bracket_id:
bracket = cand_bracket
break

assert bracket is not None and bracket_id == bracket.bracket_id
bracket.register_job(int(budget))
bracket.complete_job(int(budget))

# TODO: Iteration counter should match the checkpoint iteration counter after
# "replaying" the brackets
assert self.iteration_counter == checkpoint['state']['iteration_counter']

@logger.catch
def run(self, fevals=None, brackets=None, total_cost=None, single_node_with_gpus=False,
Expand Down
83 changes: 83 additions & 0 deletions examples/04_restart_an_experiment_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import time
import logging
logging.basicConfig(level=logging.INFO)

from pathlib import Path
from dehb.optimizers.dehb_with_warmstart import DEHB


from hpobench.container.benchmarks.surrogates.paramnet_benchmark import ParamNetPokerOnTimeBenchmark


def objective_function(config, budget, **kwargs):

start = time.time()
socket_id = kwargs.get('socket_id')
benchmark = ParamNetPokerOnTimeBenchmark(socket_id=socket_id)
result_dict = benchmark.objective_function(configuration=config, fidelity={'budget': int(budget)})
finish = time.time()
return {'fitness': result_dict['function_value'], 'cost': result_dict['cost'],
'info': {'res_info': result_dict['info'], 'time': float(finish - start)}}


def main(result_path: str, seed=0):

result_path = Path(result_path)
result_path.mkdir(exist_ok=True, parents=True)

checkpoint_file = result_path / 'checkpoint.pkl'

benchmark = ParamNetPokerOnTimeBenchmark()

dehb = DEHB(f=objective_function,
cs=benchmark.get_configuration_space(seed=seed),
dimensions=len(benchmark.get_configuration_space().get_hyperparameters()),
min_budget=81, # Those are the budgets used by the benchmark.
max_budget=2187,
eta=3,
output_path=result_path / 'dehb_logs',
n_workers=1)

try:
traj, runtime, history = dehb.run(total_cost=20, # Let the procedure run for 20 seconds.
verbose=True,
save_intermediate=True,
# arguments below are part of **kwargs shared across workers
eta=3,
result_path=result_path,
socket_id=benchmark.socket_id)
except Exception:
# One could this a try-except to save the intermediate results in case of an error.
dehb.save_checkpoint(checkpoint_file)

# Call this function to save the checkpoint to disk.
dehb.save_checkpoint(checkpoint_file)

# To restart now the optimization procedure, create a new object.
# Set the parameters `checkpoint_file` and `restore_checkpooint`. Then, the checkpoint will be automatically loaded.
dehb_2 = DEHB(f=objective_function,
cs=benchmark.get_configuration_space(seed=seed),
dimensions=len(benchmark.get_configuration_space().get_hyperparameters()),
min_budget=81,
max_budget=2187,
eta=3,
output_path=result_path / 'dehb_logs',
n_workers=1,
checkpoint_file=result_path / 'checkpoint.pkl',
restore_checkpoint=True)

# NOTE: Make sure to increase the time limit!!
traj2, runtime2, history2 = dehb_2.run(total_cost=20 + 20,
verbose=True,
save_intermediate=True,
# This parameter is needed for the HPOBench-Benchmark Object.
socket_id=benchmark.socket_id)


if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--result_path', type=str)
args = parser.parse_args()

main(result_path=args.result_path, seed=0)
114 changes: 114 additions & 0 deletions examples/05_restart_an_experiment_with_dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""
Before calling this event start the scheduler with

`dask-scheduler --scheduler-file <result_path>/scheduler_file.txt`

and then the worker with

`dask-worker --scheduler-file <result_path>/scheduler_file.txt` --name 1 --resources "limit_proc=1" --no-nanny

Note that we give the worker a resource. By doing so, only a single task can be executed per worker.

"""

import time
import logging
logging.basicConfig(level=logging.INFO)

from distributed import Client
from pathlib import Path
from dehb.optimizers.dehb_with_warmstart import DEHB

from hpobench.container.benchmarks.surrogates.paramnet_benchmark import ParamNetPokerOnTimeBenchmark


def objective_function(config, budget, **kwargs):

start = time.time()
socket_id = kwargs.get('socket_id')
benchmark = ParamNetPokerOnTimeBenchmark(socket_id=socket_id)
result_dict = benchmark.objective_function(configuration=config, fidelity={'budget': int(budget)})
finish = time.time()
return {'fitness': result_dict['function_value'], 'cost': result_dict['cost'],
'info': {'res_info': result_dict['info'], 'time': float(finish - start)}}


def main(result_path: str, seed=0):

result_path = Path(result_path)
result_path.mkdir(exist_ok=True, parents=True)

checkpoint_file = result_path / 'checkpoint.pkl'
scheduler_file = result_path / 'scheduler_file.txt'

benchmark = ParamNetPokerOnTimeBenchmark()

client = Client(scheduler_file=scheduler_file)

dehb = DEHB(f=objective_function,
cs=benchmark.get_configuration_space(seed=seed),
dimensions=len(benchmark.get_configuration_space().get_hyperparameters()),
min_budget=81, # Those are the budgets used by the benchmark.
max_budget=2187,
eta=3,
output_path=result_path / 'dehb_logs',
client=client,
# Limit the tasks per worker by starting the worker with the same resource! See above.
client_resources={'limit_proc': 1},
)

try:
traj, runtime, history = dehb.run(total_cost=20, # Let the procedure run for 20 seconds.
verbose=True,
save_intermediate=True,
# arguments below are part of **kwargs shared across workers
eta=3,
result_path=result_path,
socket_id=benchmark.socket_id)
except Exception:
# One could this a try-except to save the intermediate results in case of an error.
dehb.save_checkpoint(checkpoint_file)

# Call this function to save the checkpoint to disk.
dehb.save_checkpoint(checkpoint_file)

dehb_2 = DEHB(f=objective_function,
cs=benchmark.get_configuration_space(seed=seed),
dimensions=len(benchmark.get_configuration_space().get_hyperparameters()),
min_budget=81,
max_budget=2187,
eta=3,
output_path=result_path / 'dehb_logs',
client=client,
client_resources={'limit_proc': 1}, # Also the new object needs this limit.
checkpoint_file=result_path / 'checkpoint.pkl',
restore_checkpoint=True,
)

traj2, runtime2, history2 = dehb_2.run(total_cost=20 + 20,
verbose=True,
save_intermediate=True,
# arguments below are part of **kwargs shared across workers
socket_id=benchmark.socket_id)


from matplotlib import pyplot as plt
import numpy as np

f = plt.figure()
plt.plot(np.arange(len(traj2)), traj2, label='restarted')
plt.plot(np.arange(len(traj)), traj, label='first run')
plt.yscale('log')
plt.legend()
plt.savefig(result_path / 'traj.png')
plt.close()


if __name__ == '__main__':

import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--result_path', type=str)
args = parser.parse_args()

main(result_path=args.result_path, seed=0)