diff --git a/alpha_automl/automl_api.py b/alpha_automl/automl_api.py index 0093090a..3a349fb6 100644 --- a/alpha_automl/automl_api.py +++ b/alpha_automl/automl_api.py @@ -12,7 +12,8 @@ from alpha_automl.visualization import plot_comparison_pipelines from alpha_automl.pipeline_serializer import PipelineSerializer -logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) +logging.basicConfig(level=logging.DEBUG, stream=sys.stdout, format='%(levelname)s|%(asctime)s|%(message)s', + datefmt='%Y-%m-%d %H:%M:%S') logger = logging.getLogger(__name__) AUTOML_NAME = 'AlphaAutoML' @@ -23,7 +24,7 @@ class BaseAutoML(): def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bound_run=5, task=None, score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None, - start_mode='auto', verbose=logging.INFO): + num_cpus=None, start_mode='auto', verbose=logging.INFO): """ Create/instantiate an BaseAutoML object. @@ -38,6 +39,7 @@ def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bo :param metric_kwargs: Additional arguments for metric. :param split_strategy_kwargs: Additional arguments for splitting_strategy. :param output_folder: Path to the output directory. If it is None, create a temp folder automatically. + :param num_cpus: Number of CPUs to be used. :param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`. :param verbose: The logs level. """ @@ -58,7 +60,7 @@ def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bo self.X = None self.y = None self.leaderboard = None - self.automl_manager = AutoMLManager(self.output_folder, time_bound, time_bound_run, task, verbose) + self.automl_manager = AutoMLManager(self.output_folder, time_bound, time_bound_run, task, num_cpus, verbose) self._start_method = get_start_method(start_mode) set_start_method(self._start_method, force=True) check_input_for_multiprocessing(self._start_method, self.scorer._score_func, 'metric') @@ -309,7 +311,7 @@ class ClassifierBaseAutoML(BaseAutoML): def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5, task=None, score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None, - start_mode='auto', verbose=logging.INFO): + num_cpus=None, start_mode='auto', verbose=logging.INFO): """ Create/instantiate an AutoMLClassifier object. @@ -324,12 +326,13 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo :param metric_kwargs: Additional arguments for metric. :param split_strategy_kwargs: Additional arguments for splitting_strategy. :param output_folder: Path to the output directory. If it is None, create a temp folder automatically. + :param num_cpus: Number of CPUs to be used. :param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`. :param verbose: The logs level. """ super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs, - split_strategy_kwargs, output_folder, start_mode, verbose) + split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose) self.label_encoder = LabelEncoder() @@ -365,7 +368,7 @@ class AutoMLClassifier(ClassifierBaseAutoML): def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5, score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None, - start_mode='auto', verbose=logging.INFO): + num_cpus=None, start_mode='auto', verbose=logging.INFO): """ Create/instantiate an AutoMLClassifier object. @@ -379,20 +382,21 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo :param metric_kwargs: Additional arguments for metric. :param split_strategy_kwargs: Additional arguments for splitting_strategy. :param output_folder: Path to the output directory. If it is None, create a temp folder automatically. + :param num_cpus: Number of CPUs to be used. :param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`. :param verbose: The logs level. """ task = 'CLASSIFICATION' super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs, - split_strategy_kwargs, output_folder, start_mode, verbose) + split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose) class AutoMLRegressor(BaseAutoML): def __init__(self, time_bound=15, metric='mean_absolute_error', split_strategy='holdout', time_bound_run=5, score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None, - start_mode='auto', verbose=logging.INFO): + num_cpus=None, start_mode='auto', verbose=logging.INFO): """ Create/instantiate an AutoMLRegressor object. @@ -406,19 +410,20 @@ def __init__(self, time_bound=15, metric='mean_absolute_error', split_strategy=' :param metric_kwargs: Additional arguments for metric. :param split_strategy_kwargs: Additional arguments for splitting_strategy. :param output_folder: Path to the output directory. If it is None, create a temp folder automatically. + :param num_cpus: Number of CPUs to be used. :param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`. :param verbose: The logs level. """ task = 'REGRESSION' super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs, - split_strategy_kwargs, output_folder, start_mode, verbose) + split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose) class AutoMLTimeSeries(BaseAutoML): def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='timeseries', time_bound_run=5, score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None, - start_mode='auto', verbose=logging.INFO, date_column=None, target_column=None): + num_cpus=None, start_mode='auto', verbose=logging.INFO, date_column=None, target_column=None): """ Create/instantiate an AutoMLTimeSeries object. @@ -432,6 +437,7 @@ def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='t :param metric_kwargs: Additional arguments for metric. :param split_strategy_kwargs: Additional arguments for TimeSeriesSplit, E.g. n_splits and test_size(int). :param output_folder: Path to the output directory. If it is None, create a temp folder automatically. + :param num_cpus: Number of CPUs to be used. :param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`. :param verbose: The logs level. """ @@ -441,7 +447,7 @@ def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='t self.target_column = target_column super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs, - split_strategy_kwargs, output_folder, start_mode, verbose) + split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose) def _column_parser(self, X): cols = list(X.columns.values) @@ -460,7 +466,7 @@ class AutoMLSemiSupervisedClassifier(ClassifierBaseAutoML): def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5, score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None, - start_mode='auto', verbose=logging.INFO): + num_cpus=None, start_mode='auto', verbose=logging.INFO): """ Create/instantiate an AutoMLSemiSupervisedClassifier object. @@ -475,13 +481,14 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo :param split_strategy_kwargs: Additional arguments for splitting_strategy. In SemiSupervised case, `n_splits` and `test_size`(test proportion from 0 to 1) can be pass to the splitter. :param output_folder: Path to the output directory. If it is None, create a temp folder automatically. + :param num_cpus: Number of CPUs to be used. :param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`. :param verbose: The logs level. """ task = 'SEMISUPERVISED' super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs, - split_strategy_kwargs, output_folder, start_mode, verbose) + split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose) if split_strategy_kwargs is None: split_strategy_kwargs = {'test_size': 0.25} diff --git a/alpha_automl/automl_manager.py b/alpha_automl/automl_manager.py index 2ff85b22..c0d2a448 100644 --- a/alpha_automl/automl_manager.py +++ b/alpha_automl/automl_manager.py @@ -1,6 +1,7 @@ import logging import time import multiprocessing +import queue as Q from alpha_automl.data_profiler import profile_data from alpha_automl.scorer import make_splitter, score_pipeline from alpha_automl.utils import sample_dataset, is_equal_splitting @@ -14,13 +15,14 @@ NEW_PRIMITIVES = {} SPLITTING_STRATEGY = 'holdout' SAMPLE_SIZE = 2000 +MAX_RUNNING_PROCESSES = multiprocessing.cpu_count() logger = logging.getLogger(__name__) class AutoMLManager(): - def __init__(self, output_folder, time_bound, time_bound_run, task, verbose): + def __init__(self, output_folder, time_bound, time_bound_run, task, num_cpus, verbose): self.output_folder = output_folder self.time_bound = time_bound * 60 self.time_bound_run = time_bound_run * 60 @@ -29,7 +31,10 @@ def __init__(self, output_folder, time_bound, time_bound_run, task, verbose): self.y = None self.scoring = None self.splitting_strategy = None + self.found_pipelines = None + self.running_processes = 1 self.verbose = verbose + self.num_cpus = num_cpus if num_cpus is not None else MAX_RUNNING_PROCESSES def search_pipelines(self, X, y, scoring, splitting_strategy, automl_hyperparams=None): if automl_hyperparams is None: @@ -49,6 +54,7 @@ def _search_pipelines(self, automl_hyperparams): metadata = profile_data(self.X) X, y, is_sample = sample_dataset(self.X, self.y, SAMPLE_SIZE, self.task) internal_splitting_strategy = make_splitter(SPLITTING_STRATEGY) + self.found_pipelines = 0 need_rescoring = True if not is_sample and is_equal_splitting(internal_splitting_strategy, self.splitting_strategy): @@ -57,46 +63,77 @@ def _search_pipelines(self, automl_hyperparams): queue = multiprocessing.Queue() search_process = multiprocessing.Process(target=search_pipelines_proc, args=(X, y, self.scoring, internal_splitting_strategy, self.task, - self.time_bound, automl_hyperparams, metadata, - self.output_folder, self.verbose, queue + automl_hyperparams, metadata, self.output_folder, self.verbose, + queue ) ) search_process.start() - found_pipelines = 0 + self.running_processes += 1 + num_processes = self.num_cpus - 2 # Exclude the main process and search process + scoring_pool = multiprocessing.Pool(max(1, num_processes)) + pipelines_to_score = [] + scoring_results = [] while True: - result = queue.get() + try: + result = queue.get(timeout=10) + except Q.Empty: + logger.debug('Reached timeout getting new pipelines') + result = None if result == 'DONE': search_process.terminate() search_process.join(10) - logger.debug(f'Found {found_pipelines} pipelines') + scoring_pool.terminate() + scoring_pool.join() + logger.debug(f'Found {self.found_pipelines} pipelines') logger.debug('Search done') break - pipeline = result - score = pipeline.get_score() - logger.debug('Found new pipeline') - yield {'pipeline': pipeline, 'message': 'FOUND'} - - if need_rescoring: - score, start_time, end_time = score_pipeline(pipeline.get_pipeline(), self.X, self.y, self.scoring, - self.splitting_strategy, self.task) - pipeline.set_score(score) - pipeline.set_start_time(start_time) - pipeline.set_end_time(end_time) - - if score is not None: - logger.debug(f'Pipeline scored successfully, score={score}') - found_pipelines += 1 - yield {'pipeline': pipeline, 'message': 'SCORED'} + elif result is not None: + pipeline = result + logger.debug('Found new pipeline') + yield {'pipeline': pipeline, 'message': 'FOUND'} + + if need_rescoring: + pipelines_to_score.append(pipeline) + else: + logger.debug(f'Pipeline scored successfully, score={pipeline.get_score()}') + self.found_pipelines += 1 + yield {'pipeline': pipeline, 'message': 'SCORED'} + + if len(pipelines_to_score) > 0: + if self.running_processes < MAX_RUNNING_PROCESSES: + pipeline = pipelines_to_score.pop(0).get_pipeline() + scoring_result = scoring_pool.apply_async( + score_pipeline, + args=(pipeline, self.X, self.y, self.scoring, self.splitting_strategy, self.task, self.verbose) + ) + scoring_results.append(scoring_result) + self.running_processes += 1 + + tmp_scoring_results = [] + for scoring_result in scoring_results: + if scoring_result.ready(): + self.running_processes -= 1 + pipeline = scoring_result.get() + if pipeline is not None: + logger.debug(f'Pipeline scored successfully, score={pipeline.get_score()}') + self.found_pipelines += 1 + yield {'pipeline': pipeline, 'message': 'SCORED'} + else: + tmp_scoring_results.append(scoring_result) + + scoring_results = tmp_scoring_results if time.time() > search_start_time + self.time_bound: logger.debug('Reached search timeout') search_process.terminate() search_process.join(10) - logger.debug(f'Found {found_pipelines} pipelines') + scoring_pool.terminate() + scoring_pool.join() + logger.debug(f'Found {self.found_pipelines} pipelines') break def check_automl_hyperparams(self, automl_hyperparams): diff --git a/alpha_automl/pipeline_synthesis/pipeline_builder.py b/alpha_automl/pipeline_synthesis/pipeline_builder.py index faf8269b..e7d8edf3 100644 --- a/alpha_automl/pipeline_synthesis/pipeline_builder.py +++ b/alpha_automl/pipeline_synthesis/pipeline_builder.py @@ -44,7 +44,7 @@ def change_default_hyperparams(primitive_object): elif isinstance(primitive_object, OrdinalEncoder): primitive_object.set_params(handle_unknown='use_encoded_value', unknown_value=-1) elif isinstance(primitive_object, SimpleImputer): - primitive_object.set_params(strategy='most_frequent') + primitive_object.set_params(strategy='most_frequent', keep_empty_features=True) class BaseBuilder: diff --git a/alpha_automl/pipeline_synthesis/setup_search.py b/alpha_automl/pipeline_synthesis/setup_search.py index 6f1178d8..679c5283 100644 --- a/alpha_automl/pipeline_synthesis/setup_search.py +++ b/alpha_automl/pipeline_synthesis/setup_search.py @@ -4,7 +4,6 @@ import logging from os.path import join from alpha_automl.grammar_loader import load_automatic_grammar, load_manual_grammar -from alpha_automl.pipeline import Pipeline from alpha_automl.pipeline_search.Coach import Coach from alpha_automl.pipeline_search.pipeline.NNet import NNetWrapper from alpha_automl.pipeline_search.pipeline.PipelineGame import PipelineGame @@ -49,8 +48,8 @@ def signal_handler(queue, signum): sys.exit(0) -def search_pipelines(X, y, scoring, splitting_strategy, task_name, time_bound, automl_hyperparams, metadata, - output_folder, verbose, queue): +def search_pipelines(X, y, scoring, splitting_strategy, task_name, automl_hyperparams, metadata, output_folder, verbose, + queue): signal.signal(signal.SIGTERM, lambda signum, frame: signal_handler(queue, signum)) hide_logs(verbose) # Hide logs here too, since multiprocessing has some issues with loggers @@ -61,12 +60,10 @@ def evaluate_pipeline(primitives, origin): score = None if pipeline is not None: - score, start_time, end_time = score_pipeline( - pipeline, X, y, scoring, splitting_strategy, task_name - ) - if score is not None: - pipeline_alphaautoml = Pipeline(pipeline, score, start_time, end_time) - queue.put(pipeline_alphaautoml) # Only send valid pipelines + alphaautoml_pipeline = score_pipeline(pipeline, X, y, scoring, splitting_strategy, task_name, verbose) + if alphaautoml_pipeline is not None: + score = alphaautoml_pipeline.get_score() + queue.put(alphaautoml_pipeline) # Only send valid pipelines return score diff --git a/alpha_automl/scorer.py b/alpha_automl/scorer.py index 7e9b19f3..760adff1 100644 --- a/alpha_automl/scorer.py +++ b/alpha_automl/scorer.py @@ -4,10 +4,11 @@ from sklearn.metrics import make_scorer as make_scorer_sk from sklearn.model_selection._split import BaseShuffleSplit, _RepeatedSplits from sklearn.model_selection import BaseCrossValidator, KFold, ShuffleSplit, cross_val_score, TimeSeriesSplit -from alpha_automl.utils import RANDOM_SEED +from alpha_automl.utils import RANDOM_SEED, hide_logs from sklearn.metrics import accuracy_score, f1_score, jaccard_score, precision_score, recall_score,\ max_error, mean_absolute_error, mean_squared_error, mean_squared_log_error, median_absolute_error, r2_score,\ adjusted_mutual_info_score, rand_score, mutual_info_score, normalized_mutual_info_score +from alpha_automl.pipeline import Pipeline from alpha_automl.primitive_loader import PRIMITIVE_TYPES logger = logging.getLogger(__name__) @@ -120,7 +121,8 @@ def make_splitter(splitting_strategy, splitting_strategy_kwargs=None): f'instance of BaseCrossValidator, BaseShuffleSplit, RepeatedSplits.') -def score_pipeline(pipeline, X, y, scoring, splitting_strategy, task_name): +def score_pipeline(pipeline, X, y, scoring, splitting_strategy, task_name, verbose): + hide_logs(verbose) # Hide logs here too, since multiprocessing has some issues with loggers score = None start_time = None end_time = None @@ -139,7 +141,11 @@ def score_pipeline(pipeline, X, y, scoring, splitting_strategy, task_name): logger.debug('Exception scoring a pipeline') logger.debug('Detailed error:', exc_info=True) - return score, start_time, end_time + return None + + alphaautoml_pipeline = Pipeline(pipeline, score, start_time, end_time) + + return alphaautoml_pipeline def make_str_metric(metric): diff --git a/scripts/amlb/user_config/extensions/Alpha-AutoML/exec.py b/scripts/amlb/user_config/extensions/Alpha-AutoML/exec.py index 5fb34cec..153c6f84 100644 --- a/scripts/amlb/user_config/extensions/Alpha-AutoML/exec.py +++ b/scripts/amlb/user_config/extensions/Alpha-AutoML/exec.py @@ -38,6 +38,7 @@ def run(dataset, config): target_name = dataset.target.name output_path = config.output_dir time_bound = int(config.max_runtime_seconds/60) + cores = config.cores log.info(f'Received parameters:\n' f'train_dataset: {train_dataset_path}\n' @@ -45,10 +46,11 @@ def run(dataset, config): f'target_name: {target_name}\n' f'time_bound: {time_bound}\n' f'metric: {metric}\n' + f'cores: {cores}' ) automl = AutoMLClassifier(time_bound=time_bound, metric=metrics_mapping[metric], time_bound_run=15, - output_folder=output_path, verbose=logging.DEBUG) + output_folder=output_path, num_cpus=cores, verbose=logging.DEBUG) train_dataset = pd.read_csv(train_dataset_path) test_dataset = pd.read_csv(test_dataset_path)