diff --git a/autosklearn/automl.py b/autosklearn/automl.py index add43d66aa..c0a0ccf650 100644 --- a/autosklearn/automl.py +++ b/autosklearn/automl.py @@ -43,8 +43,8 @@ from autosklearn.util.stopwatch import StopWatch from autosklearn.util.logging_ import ( get_logger, - LogRecordSocketReceiver, setup_logger, + start_log_server, ) from autosklearn.util import pipeline, RE_PATTERN from autosklearn.ensemble_builder import EnsembleBuilderManager @@ -230,14 +230,12 @@ def __init__(self, def _create_dask_client(self): self._is_dask_client_internally_created = True - processes = False if self._n_jobs is not None and self._n_jobs > 1: - processes = True dask.config.set({'distributed.worker.daemon': False}) self._dask_client = dask.distributed.Client( dask.distributed.LocalCluster( n_workers=self._n_jobs, - processes=processes, + processes=False, threads_per_worker=1, # We use the temporal directory to save the # dask workers, because deleting workers @@ -286,24 +284,37 @@ def _get_logger(self, name): # under the above logging configuration setting # We need to specify the logger_name so that received records # are treated under the logger_name ROOT logger setting - context = multiprocessing.get_context('fork') + context = multiprocessing.get_context('spawn') self.stop_logging_server = context.Event() - - while True: - # Loop until we find a valid port - self._logger_port = np.random.randint(10000, 65535) - try: - self.logger_tcpserver = LogRecordSocketReceiver(logname=logger_name, - port=self._logger_port, - event=self.stop_logging_server) - break - except OSError: - continue + port = context.Value('l') # be safe by using a long + port.value = -1 self.logging_server = context.Process( - target=self.logger_tcpserver.serve_until_stopped) - self.logging_server.daemon = False + target=start_log_server, + kwargs=dict( + host='localhost', + logname=logger_name, + event=self.stop_logging_server, + port=port, + output_file=os.path.join( + self._backend.temporary_directory, '%s.log' % str(logger_name) + ), + logging_config=self.logging_config, + output_dir=self._backend.temporary_directory, + ), + ) + self.logging_server.start() + + while True: + with port.get_lock(): + if port.value == -1: + time.sleep(0.01) + else: + break + + self._logger_port = int(port.value) + return get_logger(logger_name) def _clean_logger(self): @@ -322,7 +333,6 @@ def _clean_logger(self): # process. self.logging_server.join(timeout=5) self.logging_server.terminate() - del self.logger_tcpserver del self.stop_logging_server @staticmethod @@ -756,7 +766,9 @@ def fit( if len(proc_ensemble.futures) > 0: future = proc_ensemble.futures.pop() - future.cancel() + # Now we need to wait for the future to return as it cannot be cancelled while it + # is running: https://stackoverflow.com/a/49203129 + future.result() if load_models: self._load_models() diff --git a/autosklearn/ensemble_builder.py b/autosklearn/ensemble_builder.py index c69cb25b3b..2bb97d8b54 100644 --- a/autosklearn/ensemble_builder.py +++ b/autosklearn/ensemble_builder.py @@ -9,7 +9,6 @@ import pickle import re import shutil -import sys import time import traceback from typing import List, Optional, Tuple, Union @@ -150,7 +149,11 @@ def __call__( ): self.build_ensemble(smbo.tae_runner.client) - def build_ensemble(self, dask_client: dask.distributed.Client) -> None: + def build_ensemble( + self, + dask_client: dask.distributed.Client, + pynisher_context: str = 'spawn', + ) -> None: # The second criteria is elapsed time elapsed_time = time.time() - self.start_time @@ -219,6 +222,7 @@ def build_ensemble(self, dask_client: dask.distributed.Client) -> None: iteration=self.iteration, return_predictions=False, priority=100, + pynisher_context=pynisher_context, logger_port=self.logger_port, )) @@ -256,6 +260,7 @@ def fit_and_return_ensemble( end_at: float, iteration: int, return_predictions: bool, + pynisher_context: str, logger_port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT, ) -> Tuple[ List[Tuple[int, float, float, float]], @@ -309,6 +314,8 @@ def fit_and_return_ensemble( because we do not know when dask schedules the job. iteration: int The current iteration + pynisher_context: str + Context to use for multiprocessing, can be either fork, spawn or forkserver. logger_port: int The port where the logging server is listening to. @@ -337,6 +344,7 @@ def fit_and_return_ensemble( end_at=end_at, iteration=iteration, return_predictions=return_predictions, + pynisher_context=pynisher_context, ) return result @@ -540,6 +548,7 @@ def run( end_at: Optional[float] = None, time_buffer=5, return_predictions: bool = False, + pynisher_context: str = 'spawn', # only change for unit testing! ): if time_left is None and end_at is None: @@ -564,12 +573,7 @@ def run( if time_left - time_buffer < 1: break - context = multiprocessing.get_context('forkserver') - # Try to copy as many modules into the new context to reduce startup time - # http://www.bnikolic.co.uk/blog/python/parallelism/2019/11/13/python-forkserver-preload.html - # do not copy the logging module as it causes deadlocks! - preload_modules = list(filter(lambda key: 'logging' not in key, sys.modules.keys())) - context.set_forkserver_preload(preload_modules) + context = multiprocessing.get_context(pynisher_context) safe_ensemble_script = pynisher.enforce_limits( wall_time_in_s=int(time_left - time_buffer), mem_in_mb=self.memory_limit, diff --git a/autosklearn/estimators.py b/autosklearn/estimators.py index 1119e1052a..89e731a202 100644 --- a/autosklearn/estimators.py +++ b/autosklearn/estimators.py @@ -177,10 +177,6 @@ def __init__( dask_client : dask.distributed.Client, optional User-created dask client, can be used to start a dask cluster and then attach auto-sklearn to it. - - Auto-sklearn can run into a deadlock if the dask client uses threads for - parallelization, it is therefore highly recommended to use dask workers - using a single process. disable_evaluator_output: bool or list, optional (False) If True, disable model and prediction output. Cannot be used diff --git a/autosklearn/evaluation/__init__.py b/autosklearn/evaluation/__init__.py index 3e389a4478..3f699e0c7d 100644 --- a/autosklearn/evaluation/__init__.py +++ b/autosklearn/evaluation/__init__.py @@ -4,7 +4,6 @@ import math import multiprocessing from queue import Empty -import sys import time import traceback from typing import Dict, List, Optional, Tuple, Union @@ -102,7 +101,7 @@ def __init__(self, backend, autosklearn_seed, resampling_strategy, metric, run_obj='quality', par_factor=1, all_scoring_functions=False, output_y_hat_optimization=True, include=None, exclude=None, memory_limit=None, disable_file_output=False, init_params=None, - budget_type=None, ta=False, **resampling_strategy_args): + budget_type=None, ta=False, pynisher_context='spawn', **resampling_strategy_args): if resampling_strategy == 'holdout': eval_function = autosklearn.evaluation.train_evaluator.eval_holdout @@ -176,6 +175,8 @@ def __init__(self, backend, autosklearn_seed, resampling_strategy, metric, else: self._get_test_loss = False + self.pynisher_context = pynisher_context + def run_wrapper( self, run_info: RunInfo, @@ -244,12 +245,7 @@ def run( instance_specific: Optional[str] = None, ) -> Tuple[StatusType, float, float, Dict[str, Union[int, float, str, Dict, List, Tuple]]]: - context = multiprocessing.get_context('forkserver') - # Try to copy as many modules into the new context to reduce startup time - # http://www.bnikolic.co.uk/blog/python/parallelism/2019/11/13/python-forkserver-preload.html - # do not copy the logging module as it causes deadlocks! - preload_modules = list(filter(lambda key: 'logging' not in key, sys.modules.keys())) - context.set_forkserver_preload(preload_modules) + context = multiprocessing.get_context(self.pynisher_context) queue = context.Queue() if not (instance_specific is None or instance_specific == '0'): diff --git a/autosklearn/util/logging.yaml b/autosklearn/util/logging.yaml index 62db539ae2..8c8bad3243 100644 --- a/autosklearn/util/logging.yaml +++ b/autosklearn/util/logging.yaml @@ -52,7 +52,7 @@ loggers: EnsembleBuilder: level: DEBUG - propagate: no + handlers: [file_handler, console] distributed: level: DEBUG diff --git a/autosklearn/util/logging_.py b/autosklearn/util/logging_.py index 07d70b07a4..754cb124b7 100644 --- a/autosklearn/util/logging_.py +++ b/autosklearn/util/logging_.py @@ -2,8 +2,10 @@ import logging import logging.config import logging.handlers +import multiprocessing import os import pickle +import random import select import socketserver import struct @@ -49,9 +51,75 @@ def get_logger(name: str) -> 'PickableLoggerAdapter': return logger -def get_named_client_logger(name: str, host: str = 'localhost', - port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT - ) -> 'PickableLoggerAdapter': +class PickableLoggerAdapter(object): + + def __init__(self, name: str): + self.name = name + self.logger = _create_logger(name) + + def __getstate__(self) -> Dict[str, Any]: + """ + Method is called when pickle dumps an object. + + Returns + ------- + Dictionary, representing the object state to be pickled. Ignores + the self.logger field and only returns the logger name. + """ + return {'name': self.name} + + def __setstate__(self, state: Dict[str, Any]) -> None: + """ + Method is called when pickle loads an object. Retrieves the name and + creates a logger. + + Parameters + ---------- + state - dictionary, containing the logger name. + + """ + self.name = state['name'] + self.logger = _create_logger(self.name) + + def debug(self, msg: str, *args: Any, **kwargs: Any) -> None: + self.logger.debug(msg, *args, **kwargs) + + def info(self, msg: str, *args: Any, **kwargs: Any) -> None: + self.logger.info(msg, *args, **kwargs) + + def warning(self, msg: str, *args: Any, **kwargs: Any) -> None: + self.logger.warning(msg, *args, **kwargs) + + def error(self, msg: str, *args: Any, **kwargs: Any) -> None: + self.logger.error(msg, *args, **kwargs) + + def exception(self, msg: str, *args: Any, **kwargs: Any) -> None: + self.logger.exception(msg, *args, **kwargs) + + def critical(self, msg: str, *args: Any, **kwargs: Any) -> None: + self.logger.critical(msg, *args, **kwargs) + + def log(self, level: int, msg: str, *args: Any, **kwargs: Any) -> None: + self.logger.log(level, msg, *args, **kwargs) + + def isEnabledFor(self, level: int) -> bool: + return self.logger.isEnabledFor(level) + + +def get_named_client_logger( + name: str, + host: str = 'localhost', + port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT, +) -> 'PicklableClientLogger': + logger = PicklableClientLogger(name, host, port) + return logger + + +def _get_named_client_logger( + name: str, + host: str = 'localhost', + port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT, +) -> logging.Logger: """ When working with a logging server, clients are expected to create a logger using this method. For example, the automl object will create a master that awaits @@ -77,26 +145,25 @@ def get_named_client_logger(name: str, host: str = 'localhost', # Setup the logger configuration setup_logger() - local_logger = PickableLoggerAdapter(name) + local_logger = _create_logger(name) # Remove any handler, so that the server handles # how to process the message - local_logger.logger.handlers.clear() + local_logger.handlers.clear() - socketHandler = logging.handlers.SocketHandler( - 'localhost', - port - ) - local_logger.logger.addHandler(socketHandler) + socketHandler = logging.handlers.SocketHandler(host, port) + local_logger.addHandler(socketHandler) return local_logger -class PickableLoggerAdapter(object): +class PicklableClientLogger(PickableLoggerAdapter): - def __init__(self, name: str): + def __init__(self, name: str, host: str, port: int): self.name = name - self.logger = _create_logger(name) + self.host = host + self.port = port + self.logger = _get_named_client_logger(name, host, port) def __getstate__(self) -> Dict[str, Any]: """ @@ -107,7 +174,7 @@ def __getstate__(self) -> Dict[str, Any]: Dictionary, representing the object state to be pickled. Ignores the self.logger field and only returns the logger name. """ - return {'name': self.name} + return {'name': self.name, 'host': self.host, 'port': self.port} def __setstate__(self, state: Dict[str, Any]) -> None: """ @@ -120,31 +187,9 @@ def __setstate__(self, state: Dict[str, Any]) -> None: """ self.name = state['name'] - self.logger = _create_logger(self.name) - - def debug(self, msg: str, *args: Any, **kwargs: Any) -> None: - self.logger.debug(msg, *args, **kwargs) - - def info(self, msg: str, *args: Any, **kwargs: Any) -> None: - self.logger.info(msg, *args, **kwargs) - - def warning(self, msg: str, *args: Any, **kwargs: Any) -> None: - self.logger.warning(msg, *args, **kwargs) - - def error(self, msg: str, *args: Any, **kwargs: Any) -> None: - self.logger.error(msg, *args, **kwargs) - - def exception(self, msg: str, *args: Any, **kwargs: Any) -> None: - self.logger.exception(msg, *args, **kwargs) - - def critical(self, msg: str, *args: Any, **kwargs: Any) -> None: - self.logger.critical(msg, *args, **kwargs) - - def log(self, level: int, msg: str, *args: Any, **kwargs: Any) -> None: - self.logger.log(level, msg, *args, **kwargs) - - def isEnabledFor(self, level: int) -> bool: - return self.logger.isEnabledFor(level) + self.host = state['host'] + self.port = state['port'] + self.logger = _get_named_client_logger(self.name, self.host, self.port) class LogRecordStreamHandler(socketserver.StreamRequestHandler): @@ -192,6 +237,35 @@ def handleLogRecord(self, record: logging.LogRecord) -> None: logger.handle(record) +def start_log_server( + host: str, + logname: str, + event: threading.Event, + port: multiprocessing.Value, + output_file: str, + logging_config: Dict, + output_dir: str, +) -> None: + setup_logger(output_file, logging_config, output_dir) + + while True: + # Loop until we find a valid port + _port = random.randint(10000, 65535) + try: + receiver = LogRecordSocketReceiver( + host=host, + port=_port, + logname=logname, + event=event, + ) + with port.get_lock(): + port.value = _port + receiver.serve_until_stopped() + break + except OSError: + continue + + class LogRecordSocketReceiver(socketserver.ThreadingTCPServer): """ This class implement a entity that receives tcp messages on a given address @@ -201,13 +275,14 @@ class LogRecordSocketReceiver(socketserver.ThreadingTCPServer): allow_reuse_address = True - def __init__(self, - host: str = 'localhost', - port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT, - handler: Type[LogRecordStreamHandler] = LogRecordStreamHandler, - logname: Optional[str] = None, - event: threading.Event = None, - ): + def __init__( + self, + host: str = 'localhost', + port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT, + handler: Type[LogRecordStreamHandler] = LogRecordStreamHandler, + logname: Optional[str] = None, + event: threading.Event = None, + ): socketserver.ThreadingTCPServer.__init__(self, (host, port), handler) self.timeout = 1 self.logname = logname diff --git a/requirements.txt b/requirements.txt index bf8fcafa0b..f3c2c9c869 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,6 @@ pandas>=1.0 liac-arff ConfigSpace>=0.4.14,<0.5 -pynisher>=0.6.2 +pynisher>=0.6.3 pyrfr>=0.7,<0.9 smac>=0.13.1,<0.14 diff --git a/scripts/run_auto-sklearn_for_metadata_generation.py b/scripts/run_auto-sklearn_for_metadata_generation.py index f9354fb88d..9ff715db2b 100644 --- a/scripts/run_auto-sklearn_for_metadata_generation.py +++ b/scripts/run_auto-sklearn_for_metadata_generation.py @@ -1,194 +1,196 @@ -import argparse -import json -import logging -import os -import shutil -import sys -import tempfile - -from autosklearn.classification import AutoSklearnClassifier -from autosklearn.regression import AutoSklearnRegressor -from autosklearn.evaluation import ExecuteTaFuncWithQueue, get_cost_of_crash -from autosklearn.metrics import accuracy, balanced_accuracy, roc_auc, log_loss, r2, \ - mean_squared_error, mean_absolute_error, root_mean_squared_error - -from smac.runhistory.runhistory import RunInfo -from smac.scenario.scenario import Scenario -from smac.stats.stats import Stats -from smac.tae import StatusType - -sys.path.append('.') -from update_metadata_util import load_task - - -parser = argparse.ArgumentParser() -parser.add_argument('--working-directory', type=str, required=True) -parser.add_argument('--time-limit', type=int, required=True) -parser.add_argument('--per-run-time-limit', type=int, required=True) -parser.add_argument('--task-id', type=int, required=True) -parser.add_argument('--metric', type=str, required=True) -parser.add_argument('-s', '--seed', type=int, required=True) -parser.add_argument('--unittest', action='store_true') -args = parser.parse_args() - -working_directory = args.working_directory -time_limit = args.time_limit -per_run_time_limit = args.per_run_time_limit -task_id = args.task_id -seed = args.seed -metric = args.metric -is_test = args.unittest - -X_train, y_train, X_test, y_test, cat, task_type = load_task(task_id) - -configuration_output_dir = os.path.join(working_directory, 'configuration', - task_type) -os.makedirs(configuration_output_dir, exist_ok=True) -tmp_dir = os.path.join(configuration_output_dir, str(task_id), metric) -os.makedirs(tmp_dir, exist_ok=True) - -tempdir = tempfile.mkdtemp() -autosklearn_directory = os.path.join(tempdir, "dir") - -automl_arguments = { - 'time_left_for_this_task': time_limit, - 'per_run_time_limit': per_run_time_limit, - 'initial_configurations_via_metalearning': 0, - 'ensemble_size': 0, - 'ensemble_nbest': 0, - 'seed': seed, - 'memory_limit': 3072, - 'resampling_strategy': 'partial-cv', - 'delete_tmp_folder_after_terminate': False, - 'tmp_folder': autosklearn_directory, - 'disable_evaluator_output': True, -} - -if is_test: - automl_arguments['resampling_strategy_arguments'] = {'folds': 2} +if __name__ == '__main__': + + import argparse + import json + import logging + import os + import shutil + import sys + import tempfile + + from autosklearn.classification import AutoSklearnClassifier + from autosklearn.regression import AutoSklearnRegressor + from autosklearn.evaluation import ExecuteTaFuncWithQueue, get_cost_of_crash + from autosklearn.metrics import accuracy, balanced_accuracy, roc_auc, log_loss, r2, \ + mean_squared_error, mean_absolute_error, root_mean_squared_error + + from smac.runhistory.runhistory import RunInfo + from smac.scenario.scenario import Scenario + from smac.stats.stats import Stats + from smac.tae import StatusType + + sys.path.append('.') + from update_metadata_util import load_task + + + parser = argparse.ArgumentParser() + parser.add_argument('--working-directory', type=str, required=True) + parser.add_argument('--time-limit', type=int, required=True) + parser.add_argument('--per-run-time-limit', type=int, required=True) + parser.add_argument('--task-id', type=int, required=True) + parser.add_argument('--metric', type=str, required=True) + parser.add_argument('-s', '--seed', type=int, required=True) + parser.add_argument('--unittest', action='store_true') + args = parser.parse_args() + + working_directory = args.working_directory + time_limit = args.time_limit + per_run_time_limit = args.per_run_time_limit + task_id = args.task_id + seed = args.seed + metric = args.metric + is_test = args.unittest + + X_train, y_train, X_test, y_test, cat, task_type = load_task(task_id) + + configuration_output_dir = os.path.join(working_directory, 'configuration', + task_type) + os.makedirs(configuration_output_dir, exist_ok=True) + tmp_dir = os.path.join(configuration_output_dir, str(task_id), metric) + os.makedirs(tmp_dir, exist_ok=True) + + tempdir = tempfile.mkdtemp() + autosklearn_directory = os.path.join(tempdir, "dir") + + automl_arguments = { + 'time_left_for_this_task': time_limit, + 'per_run_time_limit': per_run_time_limit, + 'initial_configurations_via_metalearning': 0, + 'ensemble_size': 0, + 'ensemble_nbest': 0, + 'seed': seed, + 'memory_limit': 3072, + 'resampling_strategy': 'partial-cv', + 'delete_tmp_folder_after_terminate': False, + 'tmp_folder': autosklearn_directory, + 'disable_evaluator_output': True, + } + + if is_test: + automl_arguments['resampling_strategy_arguments'] = {'folds': 2} + if task_type == 'classification': + automl_arguments['include_estimators'] = ['sgd'] + include = {'classifier': ['sgd']} + elif task_type == 'regression': + automl_arguments['include_estimators'] = ['extra_trees'] + automl_arguments['include_preprocessors'] = ['no_preprocessing'] + include = {'regressor': ['extra_trees'], 'feature_preprocessor': ['no_preprocessing']} + else: + raise ValueError('Unsupported task type: %s' % str(task_type)) + else: + automl_arguments['resampling_strategy_arguments'] = {'folds': 10} + include = None + + metric = { + 'accuracy': accuracy, + 'balanced_accuracy': balanced_accuracy, + 'roc_auc': roc_auc, + 'logloss': log_loss, + 'r2': r2, + 'mean_squared_error': mean_squared_error, + 'root_mean_squared_error': root_mean_squared_error, + 'mean_absolute_error': mean_absolute_error, + }[metric] + automl_arguments['metric'] = metric + if task_type == 'classification': - automl_arguments['include_estimators'] = ['sgd'] - include = {'classifier': ['sgd']} + automl = AutoSklearnClassifier(**automl_arguments) elif task_type == 'regression': - automl_arguments['include_estimators'] = ['extra_trees'] - automl_arguments['include_preprocessors'] = ['no_preprocessing'] - include = {'regressor': ['extra_trees'], 'feature_preprocessor': ['no_preprocessing']} + automl = AutoSklearnRegressor(**automl_arguments) + else: + raise ValueError(task_type) + + automl.fit(X_train, y_train, dataset_name=str(task_id), + feat_type=cat, X_test=X_test, y_test=y_test) + trajectory = automl.trajectory_ + + incumbent_id_to_model = {} + incumbent_id_to_performance = {} + validated_trajectory = [] + + if is_test: + memory_limit_factor = 1 else: - raise ValueError('Unsupported task type: %s' % str(task_type)) -else: - automl_arguments['resampling_strategy_arguments'] = {'folds': 10} - include = None - -metric = { - 'accuracy': accuracy, - 'balanced_accuracy': balanced_accuracy, - 'roc_auc': roc_auc, - 'logloss': log_loss, - 'r2': r2, - 'mean_squared_error': mean_squared_error, - 'root_mean_squared_error': root_mean_squared_error, - 'mean_absolute_error': mean_absolute_error, -}[metric] -automl_arguments['metric'] = metric - -if task_type == 'classification': - automl = AutoSklearnClassifier(**automl_arguments) -elif task_type == 'regression': - automl = AutoSklearnRegressor(**automl_arguments) -else: - raise ValueError(task_type) - -automl.fit(X_train, y_train, dataset_name=str(task_id), - feat_type=cat, X_test=X_test, y_test=y_test) -trajectory = automl.trajectory_ - -incumbent_id_to_model = {} -incumbent_id_to_performance = {} -validated_trajectory = [] - -if is_test: - memory_limit_factor = 1 -else: - memory_limit_factor = 2 - -print('Starting to validate configurations') -for i, entry in enumerate(trajectory): - print('Starting to validate configuration %d/%d' % (i + 1, len(trajectory))) - incumbent_id = entry.incumbent_id - train_performance = entry.train_perf - if incumbent_id not in incumbent_id_to_model: - config = entry.incumbent - - logger = logging.getLogger('Testing:)') - stats = Stats( - Scenario({ - 'cutoff_time': per_run_time_limit * 2, - 'run_obj': 'quality', - }) - ) - stats.start_timing() - # To avoid the output "first run crashed"... - stats.submitted_ta_runs += 1 - stats.finished_ta_runs += 1 - memory_lim = memory_limit_factor * automl_arguments['memory_limit'] - ta = ExecuteTaFuncWithQueue(backend=automl.automl_._backend, - autosklearn_seed=seed, - resampling_strategy='test', - memory_limit=memory_lim, - disable_file_output=True, - logger=logger, - stats=stats, - all_scoring_functions=True, - include=include, - metric=automl_arguments['metric'], - cost_for_crash=get_cost_of_crash(automl_arguments['metric']), - abort_on_first_run_crash=False,) - run_info, run_value = ta.run_wrapper( - RunInfo( - config=config, - instance=None, - instance_specific=None, - seed=1, - cutoff=per_run_time_limit*3, - capped=False, + memory_limit_factor = 2 + + print('Starting to validate configurations') + for i, entry in enumerate(trajectory): + print('Starting to validate configuration %d/%d' % (i + 1, len(trajectory))) + incumbent_id = entry.incumbent_id + train_performance = entry.train_perf + if incumbent_id not in incumbent_id_to_model: + config = entry.incumbent + + logger = logging.getLogger('Testing:)') + stats = Stats( + Scenario({ + 'cutoff_time': per_run_time_limit * 2, + 'run_obj': 'quality', + }) ) - ) - - if run_value.status == StatusType.SUCCESS: - assert len(run_value.additional_info) > 1, run_value.additional_info - - # print(additional_run_info) - - validated_trajectory.append(list(entry) + [task_id] + - [run_value.additional_info]) - print('Finished validating configuration %d/%d' % (i + 1, len(trajectory))) -print('Finished to validate configurations') - -print('Starting to copy data to configuration directory', flush=True) -validated_trajectory = [entry[:2] + [entry[2].get_dictionary()] + entry[3:] - for entry in validated_trajectory] -validated_trajectory_file = os.path.join(tmp_dir, 'validation_trajectory_%d.json' % seed) -with open(validated_trajectory_file, 'w') as fh: - json.dump(validated_trajectory, fh, indent=4) - - -for dirpath, dirnames, filenames in os.walk(autosklearn_directory, topdown=False): - print(dirpath, dirnames, filenames) - for filename in filenames: - if filename == 'datamanager.pkl': - os.remove(os.path.join(dirpath, filename)) - elif filename == 'configspace.pcs': - os.remove(os.path.join(dirpath, filename)) - for dirname in dirnames: - if dirname in ('models', 'cv_models'): - os.rmdir(os.path.join(dirpath, dirname)) - - -print('Going to copy the configuration directory') -shutil.copytree(autosklearn_directory, os.path.join(tmp_dir, 'auto-sklearn-output')) -print('Finished copying the configuration directory') -try: - shutil.rmtree(tempdir) -except: - pass -print('Finished configuring') + stats.start_timing() + # To avoid the output "first run crashed"... + stats.submitted_ta_runs += 1 + stats.finished_ta_runs += 1 + memory_lim = memory_limit_factor * automl_arguments['memory_limit'] + ta = ExecuteTaFuncWithQueue(backend=automl.automl_._backend, + autosklearn_seed=seed, + resampling_strategy='test', + memory_limit=memory_lim, + disable_file_output=True, + logger=logger, + stats=stats, + all_scoring_functions=True, + include=include, + metric=automl_arguments['metric'], + cost_for_crash=get_cost_of_crash(automl_arguments['metric']), + abort_on_first_run_crash=False,) + run_info, run_value = ta.run_wrapper( + RunInfo( + config=config, + instance=None, + instance_specific=None, + seed=1, + cutoff=per_run_time_limit*3, + capped=False, + ) + ) + + if run_value.status == StatusType.SUCCESS: + assert len(run_value.additional_info) > 1, run_value.additional_info + + # print(additional_run_info) + + validated_trajectory.append(list(entry) + [task_id] + + [run_value.additional_info]) + print('Finished validating configuration %d/%d' % (i + 1, len(trajectory))) + print('Finished to validate configurations') + + print('Starting to copy data to configuration directory', flush=True) + validated_trajectory = [entry[:2] + [entry[2].get_dictionary()] + entry[3:] + for entry in validated_trajectory] + validated_trajectory_file = os.path.join(tmp_dir, 'validation_trajectory_%d.json' % seed) + with open(validated_trajectory_file, 'w') as fh: + json.dump(validated_trajectory, fh, indent=4) + + + for dirpath, dirnames, filenames in os.walk(autosklearn_directory, topdown=False): + print(dirpath, dirnames, filenames) + for filename in filenames: + if filename == 'datamanager.pkl': + os.remove(os.path.join(dirpath, filename)) + elif filename == 'configspace.pcs': + os.remove(os.path.join(dirpath, filename)) + for dirname in dirnames: + if dirname in ('models', 'cv_models'): + os.rmdir(os.path.join(dirpath, dirname)) + + + print('Going to copy the configuration directory') + shutil.copytree(autosklearn_directory, os.path.join(tmp_dir, 'auto-sklearn-output')) + print('Finished copying the configuration directory') + try: + shutil.rmtree(tempdir) + except: + pass + print('Finished configuring') diff --git a/test/test_ensemble_builder/test_ensemble.py b/test/test_ensemble_builder/test_ensemble.py index be51adc45e..162e246262 100644 --- a/test/test_ensemble_builder/test_ensemble.py +++ b/test/test_ensemble_builder/test_ensemble.py @@ -553,15 +553,15 @@ def mtime_mock(filename): get_logger_mock.return_value = logger_mock mtime.side_effect = mtime_mock - ensbuilder.run(time_left=1000, iteration=0) + ensbuilder.run(time_left=1000, iteration=0, pynisher_context='fork') assert os.path.exists(read_scores_file) assert not os.path.exists(read_preds_file) assert logger_mock.warning.call_count == 1 - ensbuilder.run(time_left=1000, iteration=0) + ensbuilder.run(time_left=1000, iteration=0, pynisher_context='fork') assert os.path.exists(read_scores_file) assert not os.path.exists(read_preds_file) assert logger_mock.warning.call_count == 2 - ensbuilder.run(time_left=1000, iteration=0) + ensbuilder.run(time_left=1000, iteration=0, pynisher_context='fork') assert os.path.exists(read_scores_file) assert not os.path.exists(read_preds_file) assert logger_mock.warning.call_count == 3 @@ -569,7 +569,7 @@ def mtime_mock(filename): # it should try to reduce ensemble_nbest until it also failed at 2 assert ensbuilder.ensemble_nbest == 1 - ensbuilder.run(time_left=1000, iteration=0) + ensbuilder.run(time_left=1000, iteration=0, pynisher_context='fork') assert os.path.exists(read_scores_file) assert not os.path.exists(read_preds_file) assert logger_mock.warning.call_count == 4 @@ -795,14 +795,15 @@ def test_ensemble_builder_nbest_remembered( max_iterations=None, ) - manager.build_ensemble(dask_client_single_worker) + # Use fork context in the next line to allow for the mock to work + manager.build_ensemble(dask_client_single_worker, 'fork') future = manager.futures[0] dask.distributed.wait([future]) # wait for the ensemble process to finish assert future.result() == ([], 5, None, None, None) file_path = os.path.join(ensemble_backend.internals_directory, 'ensemble_read_preds.pkl') assert not os.path.exists(file_path) - manager.build_ensemble(dask_client_single_worker) + manager.build_ensemble(dask_client_single_worker, 'fork') future = manager.futures[0] dask.distributed.wait([future]) # wait for the ensemble process to finish diff --git a/test/test_evaluation/test_evaluation.py b/test/test_evaluation/test_evaluation.py index 1cc098ea0b..a17c8e47b7 100644 --- a/test/test_evaluation/test_evaluation.py +++ b/test/test_evaluation/test_evaluation.py @@ -92,6 +92,7 @@ def test_eval_with_limits_holdout(self, pynisher_mock): metric=accuracy, cost_for_crash=get_cost_of_crash(accuracy), abort_on_first_run_crash=False, + pynisher_context='fork', ) info = ta.run_wrapper(RunInfo(config=config, cutoff=30, instance=None, instance_specific=None, seed=1, capped=False)) @@ -147,6 +148,7 @@ def test_eval_with_limits_holdout_fail_silent(self, pynisher_mock): metric=accuracy, cost_for_crash=get_cost_of_crash(accuracy), abort_on_first_run_crash=False, + pynisher_context='fork', ) # The following should not fail because abort on first config crashed is false @@ -187,6 +189,7 @@ def test_eval_with_limits_holdout_fail_memory_error(self, pynisher_mock): metric=log_loss, cost_for_crash=get_cost_of_crash(log_loss), abort_on_first_run_crash=False, + pynisher_context='fork', ) info = ta.run_wrapper(RunInfo(config=config, cutoff=30, instance=None, instance_specific=None, seed=1, capped=False)) @@ -298,6 +301,7 @@ def side_effect(*args, **kwargs): metric=accuracy, cost_for_crash=get_cost_of_crash(accuracy), abort_on_first_run_crash=False, + pynisher_context='fork', ) self.scenario.wallclock_limit = 180 instance = "{'subsample': 30}" @@ -321,6 +325,7 @@ def test_exception_in_target_function(self, eval_holdout_mock): metric=accuracy, cost_for_crash=get_cost_of_crash(accuracy), abort_on_first_run_crash=False, + pynisher_context='fork', ) self.stats.submitted_ta_runs += 1 info = ta.run_wrapper(RunInfo(config=config, cutoff=30, instance=None, @@ -346,6 +351,7 @@ def test_silent_exception_in_target_function(self): cost_for_crash=get_cost_of_crash(accuracy), abort_on_first_run_crash=False, iterative=False, + pynisher_context='fork', ) ta.pynisher_logger = unittest.mock.Mock() self.stats.submitted_ta_runs += 1