-
Notifications
You must be signed in to change notification settings - Fork 283
/
smbo.py
372 lines (333 loc) · 16.6 KB
/
smbo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
import copy
import json
import logging.handlers
import typing
import ConfigSpace
from ConfigSpace.configuration_space import Configuration
import dask.distributed
from smac.facade.smac_ac_facade import SMAC4AC
from smac.intensification.hyperband import Hyperband
from smac.runhistory.runhistory import RunHistory
from smac.runhistory.runhistory2epm import RunHistory2EPM4LogCost
from smac.scenario.scenario import Scenario
from smac.tae.dask_runner import DaskParallelRunner
from smac.tae.serial_runner import SerialRunner
from smac.utils.io.traj_logging import TrajEntry
from autoPyTorch.automl_common.common.utils.backend import Backend
from autoPyTorch.datasets.base_dataset import BaseDataset
from autoPyTorch.datasets.resampling_strategy import (
CrossValTypes,
DEFAULT_RESAMPLING_PARAMETERS,
HoldoutValTypes,
)
from autoPyTorch.ensemble.ensemble_builder import EnsembleBuilderManager
from autoPyTorch.evaluation.tae import ExecuteTaFuncWithQueue, get_cost_of_crash
from autoPyTorch.optimizer.utils import read_return_initial_configurations
from autoPyTorch.pipeline.components.training.metrics.base import autoPyTorchMetric
from autoPyTorch.utils.hyperparameter_search_space_update import HyperparameterSearchSpaceUpdates
from autoPyTorch.utils.logging_ import get_named_client_logger
from autoPyTorch.utils.stopwatch import StopWatch
def get_smac_object(
scenario_dict: typing.Dict[str, typing.Any],
seed: int,
ta: typing.Callable,
ta_kwargs: typing.Dict[str, typing.Any],
n_jobs: int,
initial_budget: int,
max_budget: int,
dask_client: typing.Optional[dask.distributed.Client],
initial_configurations: typing.Optional[typing.List[Configuration]] = None,
) -> SMAC4AC:
"""
This function returns an SMAC object that is gonna be used as
optimizer of pipelines
Args:
scenario_dict (typing.Dict[str, typing.Any]): constrain on how to run
the jobs
seed (int): to make the job deterministic
ta (typing.Callable): the function to be intensifier by smac
ta_kwargs (typing.Dict[str, typing.Any]): Arguments to the above ta
n_jobs (int): Amount of cores to use for this task
dask_client (dask.distributed.Client): User provided scheduler
initial_configurations (typing.List[Configuration]): List of initial
configurations which smac will run before starting the search process
Returns:
(SMAC4AC): sequential model algorithm configuration object
"""
intensifier = Hyperband
rh2EPM = RunHistory2EPM4LogCost
return SMAC4AC(
scenario=Scenario(scenario_dict),
rng=seed,
runhistory2epm=rh2EPM,
tae_runner=ta,
tae_runner_kwargs=ta_kwargs,
initial_configurations=initial_configurations,
run_id=seed,
intensifier=intensifier,
intensifier_kwargs={'initial_budget': initial_budget, 'max_budget': max_budget,
'eta': 3, 'min_chall': 1, 'instance_order': 'shuffle_once'},
dask_client=dask_client,
n_jobs=n_jobs,
)
class AutoMLSMBO(object):
def __init__(self,
config_space: ConfigSpace.ConfigurationSpace,
dataset_name: str,
backend: Backend,
total_walltime_limit: float,
func_eval_time_limit_secs: float,
memory_limit: typing.Optional[int],
metric: autoPyTorchMetric,
watcher: StopWatch,
n_jobs: int,
dask_client: typing.Optional[dask.distributed.Client],
pipeline_config: typing.Dict[str, typing.Any],
start_num_run: int = 1,
seed: int = 1,
resampling_strategy: typing.Union[HoldoutValTypes, CrossValTypes] = HoldoutValTypes.holdout_validation,
resampling_strategy_args: typing.Optional[typing.Dict[str, typing.Any]] = None,
include: typing.Optional[typing.Dict[str, typing.Any]] = None,
exclude: typing.Optional[typing.Dict[str, typing.Any]] = None,
disable_file_output: typing.List = [],
smac_scenario_args: typing.Optional[typing.Dict[str, typing.Any]] = None,
get_smac_object_callback: typing.Optional[typing.Callable] = None,
all_supported_metrics: bool = True,
ensemble_callback: typing.Optional[EnsembleBuilderManager] = None,
logger_port: typing.Optional[int] = None,
search_space_updates: typing.Optional[HyperparameterSearchSpaceUpdates] = None,
portfolio_selection: typing.Optional[str] = None,
pynisher_context: str = 'spawn',
):
"""
Interface to SMAC. This method calls the SMAC optimize method, and allows
to pass a callback (ensemble_callback) to make launch task at the end of each
optimize() algorithm. The later is needed due to the nature of blocking long running
tasks in Dask.
Args:
config_space (ConfigSpace.ConfigurationSpac):
The configuration space of the whole process
dataset_name (str):
The name of the dataset, used to identify the current job
backend (Backend):
An interface with disk
total_walltime_limit (float):
The maximum allowed time for this job
func_eval_time_limit_secs (float):
How much each individual task is allowed to last
memory_limit (typing.Optional[int]):
Maximum allowed CPU memory this task can use
metric (autoPyTorchMetric):
An scorer object to evaluate the performance of each jon
watcher (StopWatch):
A stopwatch object to debug time consumption
n_jobs (int):
How many workers are allowed in each task
dask_client (typing.Optional[dask.distributed.Client]):
An user provided scheduler. Else smac will create its own.
start_num_run (int):
The ID index to start runs
seed (int):
To make the run deterministic
resampling_strategy (str):
What strategy to use for performance validation
resampling_strategy_args (typing.Optional[typing.Dict[str, typing.Any]]):
Arguments to the resampling strategy -- like number of folds
include (typing.Optional[typing.Dict[str, typing.Any]] = None):
Optimal Configuration space modifiers
exclude (typing.Optional[typing.Dict[str, typing.Any]] = None):
Optimal Configuration space modifiers
disable_file_output List:
Support to disable file output to disk -- to reduce space
smac_scenario_args (typing.Optional[typing.Dict[str, typing.Any]]):
Additional arguments to the smac scenario
get_smac_object_callback (typing.Optional[typing.Callable]):
Allows to create a user specified SMAC object
pynisher_context (str):
A string indicating the multiprocessing context to use
ensemble_callback (typing.Optional[EnsembleBuilderManager]):
A callback used in this scenario to start ensemble building subtasks
portfolio_selection (str), (default=None):
This argument controls the initial configurations that
AutoPyTorch uses to warm start SMAC for hyperparameter
optimization. By default, no warm-starting happens.
The user can provide a path to a json file containing
configurations, similar to (autoPyTorch/configs/greedy_portfolio.json).
Additionally, the keyword 'greedy' is supported,
which would use the default portfolio from
`AutoPyTorch Tabular <https://arxiv.org/abs/2006.13799>`
"""
super(AutoMLSMBO, self).__init__()
# data related
self.dataset_name = dataset_name
self.datamanager: typing.Optional[BaseDataset] = None
self.metric = metric
self.task: typing.Optional[str] = None
self.backend = backend
self.all_supported_metrics = all_supported_metrics
self.pipeline_config = pipeline_config
# the configuration space
self.config_space = config_space
# the number of parallel workers/jobs
self.n_jobs = n_jobs
self.dask_client = dask_client
# Evaluation
self.resampling_strategy = resampling_strategy
if resampling_strategy_args is None:
resampling_strategy_args = DEFAULT_RESAMPLING_PARAMETERS[resampling_strategy]
self.resampling_strategy_args = resampling_strategy_args
# and a bunch of useful limits
self.worst_possible_result = get_cost_of_crash(self.metric)
self.total_walltime_limit = int(total_walltime_limit)
self.func_eval_time_limit_secs = int(func_eval_time_limit_secs)
self.memory_limit = memory_limit
self.watcher = watcher
self.seed = seed
self.start_num_run = start_num_run
self.include = include
self.exclude = exclude
self.disable_file_output = disable_file_output
self.smac_scenario_args = smac_scenario_args
self.get_smac_object_callback = get_smac_object_callback
self.pynisher_context = pynisher_context
self.ensemble_callback = ensemble_callback
self.search_space_updates = search_space_updates
dataset_name_ = "" if dataset_name is None else dataset_name
if logger_port is None:
self.logger_port = logging.handlers.DEFAULT_TCP_LOGGING_PORT
else:
self.logger_port = logger_port
logger_name = '%s(%d):%s' % (self.__class__.__name__, self.seed, ":" + dataset_name_)
self.logger = get_named_client_logger(name=logger_name,
port=self.logger_port)
self.logger.info("initialised {}".format(self.__class__.__name__))
self.initial_configurations: typing.Optional[typing.List[Configuration]] = None
if portfolio_selection is not None:
self.initial_configurations = read_return_initial_configurations(config_space=config_space,
portfolio_selection=portfolio_selection)
def reset_data_manager(self) -> None:
if self.datamanager is not None:
del self.datamanager
self.datamanager = self.backend.load_datamanager()
if self.datamanager is not None and self.datamanager.task_type is not None:
self.task = self.datamanager.task_type
def run_smbo(self, func: typing.Optional[typing.Callable] = None
) -> typing.Tuple[RunHistory, typing.List[TrajEntry], str]:
self.watcher.start_task('SMBO')
self.logger.info("Started run of SMBO")
# == first things first: load the datamanager
self.reset_data_manager()
# == Initialize non-SMBO stuff
# first create a scenario
seed = self.seed
self.config_space.seed(seed)
# allocate a run history
num_run = self.start_num_run
# Initialize some SMAC dependencies
if isinstance(self.resampling_strategy, CrossValTypes):
num_splits = self.resampling_strategy_args['num_splits']
instances = [[json.dumps({'task_id': self.dataset_name,
'fold': fold_number})]
for fold_number in range(num_splits)]
else:
instances = [[json.dumps({'task_id': self.dataset_name})]]
# TODO rebuild target algorithm to be it's own target algorithm
# evaluator, which takes into account that a run can be killed prior
# to the model being fully fitted; thus putting intermediate results
# into a queue and querying them once the time is over
ta_kwargs = dict(
backend=copy.deepcopy(self.backend),
seed=seed,
initial_num_run=num_run,
include=self.include if self.include is not None else dict(),
exclude=self.exclude if self.exclude is not None else dict(),
metric=self.metric,
memory_limit=self.memory_limit,
disable_file_output=self.disable_file_output,
ta=func,
logger_port=self.logger_port,
all_supported_metrics=self.all_supported_metrics,
pipeline_config=self.pipeline_config,
search_space_updates=self.search_space_updates,
pynisher_context=self.pynisher_context,
)
ta = ExecuteTaFuncWithQueue
self.logger.info("Created TA")
startup_time = self.watcher.wall_elapsed(self.dataset_name)
total_walltime_limit = self.total_walltime_limit - startup_time - 5
scenario_dict = {
'abort_on_first_run_crash': False,
'cs': self.config_space,
'cutoff_time': self.func_eval_time_limit_secs,
'deterministic': 'true',
'instances': instances,
'memory_limit': self.memory_limit,
'output-dir': self.backend.get_smac_output_directory(),
'run_obj': 'quality',
'wallclock_limit': total_walltime_limit,
'cost_for_crash': self.worst_possible_result,
}
if self.smac_scenario_args is not None:
for arg in [
'abort_on_first_run_crash',
'cs',
'deterministic',
'instances',
'output-dir',
'run_obj',
'shared-model',
'cost_for_crash',
]:
if arg in self.smac_scenario_args:
self.logger.warning('Cannot override scenario argument %s, '
'will ignore this.', arg)
del self.smac_scenario_args[arg]
for arg in [
'cutoff_time',
'memory_limit',
'wallclock_limit',
]:
if arg in self.smac_scenario_args:
self.logger.warning(
'Overriding scenario argument %s: %s with value %s',
arg,
scenario_dict[arg],
self.smac_scenario_args[arg]
)
scenario_dict.update(self.smac_scenario_args)
initial_budget = self.pipeline_config['min_epochs']
max_budget = self.pipeline_config['epochs']
if self.get_smac_object_callback is not None:
smac = self.get_smac_object_callback(scenario_dict=scenario_dict,
seed=seed,
ta=ta,
ta_kwargs=ta_kwargs,
n_jobs=self.n_jobs,
initial_budget=initial_budget,
max_budget=max_budget,
dask_client=self.dask_client,
initial_configurations=self.initial_configurations)
else:
smac = get_smac_object(scenario_dict=scenario_dict,
seed=seed,
ta=ta,
ta_kwargs=ta_kwargs,
n_jobs=self.n_jobs,
initial_budget=initial_budget,
max_budget=max_budget,
dask_client=self.dask_client,
initial_configurations=self.initial_configurations)
if self.ensemble_callback is not None:
smac.register_callback(self.ensemble_callback)
self.logger.info("initialised SMBO, running SMBO.optimize()")
smac.optimize()
self.logger.info("finished SMBO.optimize()")
self.runhistory = smac.solver.runhistory
self.trajectory = smac.solver.intensifier.traj_logger.trajectory
if isinstance(smac.solver.tae_runner, DaskParallelRunner):
self._budget_type = smac.solver.tae_runner.single_worker.budget_type
elif isinstance(smac.solver.tae_runner, SerialRunner):
self._budget_type = smac.solver.tae_runner.budget_type
else:
raise NotImplementedError(type(smac.solver.tae_runner))
return self.runhistory, self.trajectory, self._budget_type