Skip to content

Commit

Permalink
Merge pull request #94 from VIDA-NYU/score_in_parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
roquelopez committed Feb 21, 2024
2 parents e4338fc + 317b7ab commit 5bd0170
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 50 deletions.
33 changes: 20 additions & 13 deletions alpha_automl/automl_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from alpha_automl.visualization import plot_comparison_pipelines
from alpha_automl.pipeline_serializer import PipelineSerializer

logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout, format='%(levelname)s|%(asctime)s|%(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)

AUTOML_NAME = 'AlphaAutoML'
Expand All @@ -23,7 +24,7 @@ class BaseAutoML():

def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bound_run=5, task=None,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
start_mode='auto', verbose=logging.INFO):
num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an BaseAutoML object.
Expand All @@ -38,6 +39,7 @@ def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bo
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""
Expand All @@ -58,7 +60,7 @@ def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bo
self.X = None
self.y = None
self.leaderboard = None
self.automl_manager = AutoMLManager(self.output_folder, time_bound, time_bound_run, task, verbose)
self.automl_manager = AutoMLManager(self.output_folder, time_bound, time_bound_run, task, num_cpus, verbose)
self._start_method = get_start_method(start_mode)
set_start_method(self._start_method, force=True)
check_input_for_multiprocessing(self._start_method, self.scorer._score_func, 'metric')
Expand Down Expand Up @@ -309,7 +311,7 @@ class ClassifierBaseAutoML(BaseAutoML):

def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5, task=None,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
start_mode='auto', verbose=logging.INFO):
num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLClassifier object.
Expand All @@ -324,12 +326,13 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, start_mode, verbose)
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)

self.label_encoder = LabelEncoder()

Expand Down Expand Up @@ -365,7 +368,7 @@ class AutoMLClassifier(ClassifierBaseAutoML):

def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
start_mode='auto', verbose=logging.INFO):
num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLClassifier object.
Expand All @@ -379,20 +382,21 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

task = 'CLASSIFICATION'
super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, start_mode, verbose)
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)


class AutoMLRegressor(BaseAutoML):

def __init__(self, time_bound=15, metric='mean_absolute_error', split_strategy='holdout', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
start_mode='auto', verbose=logging.INFO):
num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLRegressor object.
Expand All @@ -406,19 +410,20 @@ def __init__(self, time_bound=15, metric='mean_absolute_error', split_strategy='
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

task = 'REGRESSION'
super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, start_mode, verbose)
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)


class AutoMLTimeSeries(BaseAutoML):
def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='timeseries', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
start_mode='auto', verbose=logging.INFO, date_column=None, target_column=None):
num_cpus=None, start_mode='auto', verbose=logging.INFO, date_column=None, target_column=None):
"""
Create/instantiate an AutoMLTimeSeries object.
Expand All @@ -432,6 +437,7 @@ def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='t
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for TimeSeriesSplit, E.g. n_splits and test_size(int).
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""
Expand All @@ -441,7 +447,7 @@ def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='t
self.target_column = target_column

super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, start_mode, verbose)
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)

def _column_parser(self, X):
cols = list(X.columns.values)
Expand All @@ -460,7 +466,7 @@ class AutoMLSemiSupervisedClassifier(ClassifierBaseAutoML):

def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
start_mode='auto', verbose=logging.INFO):
num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLSemiSupervisedClassifier object.
Expand All @@ -475,13 +481,14 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo
:param split_strategy_kwargs: Additional arguments for splitting_strategy. In SemiSupervised case, `n_splits`
and `test_size`(test proportion from 0 to 1) can be pass to the splitter.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

task = 'SEMISUPERVISED'
super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, start_mode, verbose)
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)

if split_strategy_kwargs is None:
split_strategy_kwargs = {'test_size': 0.25}
Expand Down
83 changes: 60 additions & 23 deletions alpha_automl/automl_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import time
import multiprocessing
import queue as Q
from alpha_automl.data_profiler import profile_data
from alpha_automl.scorer import make_splitter, score_pipeline
from alpha_automl.utils import sample_dataset, is_equal_splitting
Expand All @@ -14,13 +15,14 @@
NEW_PRIMITIVES = {}
SPLITTING_STRATEGY = 'holdout'
SAMPLE_SIZE = 2000
MAX_RUNNING_PROCESSES = multiprocessing.cpu_count()

logger = logging.getLogger(__name__)


class AutoMLManager():

def __init__(self, output_folder, time_bound, time_bound_run, task, verbose):
def __init__(self, output_folder, time_bound, time_bound_run, task, num_cpus, verbose):
self.output_folder = output_folder
self.time_bound = time_bound * 60
self.time_bound_run = time_bound_run * 60
Expand All @@ -29,7 +31,10 @@ def __init__(self, output_folder, time_bound, time_bound_run, task, verbose):
self.y = None
self.scoring = None
self.splitting_strategy = None
self.found_pipelines = None
self.running_processes = 1
self.verbose = verbose
self.num_cpus = num_cpus if num_cpus is not None else MAX_RUNNING_PROCESSES

def search_pipelines(self, X, y, scoring, splitting_strategy, automl_hyperparams=None):
if automl_hyperparams is None:
Expand All @@ -49,6 +54,7 @@ def _search_pipelines(self, automl_hyperparams):
metadata = profile_data(self.X)
X, y, is_sample = sample_dataset(self.X, self.y, SAMPLE_SIZE, self.task)
internal_splitting_strategy = make_splitter(SPLITTING_STRATEGY)
self.found_pipelines = 0
need_rescoring = True

if not is_sample and is_equal_splitting(internal_splitting_strategy, self.splitting_strategy):
Expand All @@ -57,46 +63,77 @@ def _search_pipelines(self, automl_hyperparams):
queue = multiprocessing.Queue()
search_process = multiprocessing.Process(target=search_pipelines_proc,
args=(X, y, self.scoring, internal_splitting_strategy, self.task,
self.time_bound, automl_hyperparams, metadata,
self.output_folder, self.verbose, queue
automl_hyperparams, metadata, self.output_folder, self.verbose,
queue
)
)

search_process.start()
found_pipelines = 0
self.running_processes += 1
num_processes = self.num_cpus - 2 # Exclude the main process and search process
scoring_pool = multiprocessing.Pool(max(1, num_processes))
pipelines_to_score = []
scoring_results = []

while True:
result = queue.get()
try:
result = queue.get(timeout=10)
except Q.Empty:
logger.debug('Reached timeout getting new pipelines')
result = None

if result == 'DONE':
search_process.terminate()
search_process.join(10)
logger.debug(f'Found {found_pipelines} pipelines')
scoring_pool.terminate()
scoring_pool.join()
logger.debug(f'Found {self.found_pipelines} pipelines')
logger.debug('Search done')
break

pipeline = result
score = pipeline.get_score()
logger.debug('Found new pipeline')
yield {'pipeline': pipeline, 'message': 'FOUND'}

if need_rescoring:
score, start_time, end_time = score_pipeline(pipeline.get_pipeline(), self.X, self.y, self.scoring,
self.splitting_strategy, self.task)
pipeline.set_score(score)
pipeline.set_start_time(start_time)
pipeline.set_end_time(end_time)

if score is not None:
logger.debug(f'Pipeline scored successfully, score={score}')
found_pipelines += 1
yield {'pipeline': pipeline, 'message': 'SCORED'}
elif result is not None:
pipeline = result
logger.debug('Found new pipeline')
yield {'pipeline': pipeline, 'message': 'FOUND'}

if need_rescoring:
pipelines_to_score.append(pipeline)
else:
logger.debug(f'Pipeline scored successfully, score={pipeline.get_score()}')
self.found_pipelines += 1
yield {'pipeline': pipeline, 'message': 'SCORED'}

if len(pipelines_to_score) > 0:
if self.running_processes < MAX_RUNNING_PROCESSES:
pipeline = pipelines_to_score.pop(0).get_pipeline()
scoring_result = scoring_pool.apply_async(
score_pipeline,
args=(pipeline, self.X, self.y, self.scoring, self.splitting_strategy, self.task, self.verbose)
)
scoring_results.append(scoring_result)
self.running_processes += 1

tmp_scoring_results = []
for scoring_result in scoring_results:
if scoring_result.ready():
self.running_processes -= 1
pipeline = scoring_result.get()
if pipeline is not None:
logger.debug(f'Pipeline scored successfully, score={pipeline.get_score()}')
self.found_pipelines += 1
yield {'pipeline': pipeline, 'message': 'SCORED'}
else:
tmp_scoring_results.append(scoring_result)

scoring_results = tmp_scoring_results

if time.time() > search_start_time + self.time_bound:
logger.debug('Reached search timeout')
search_process.terminate()
search_process.join(10)
logger.debug(f'Found {found_pipelines} pipelines')
scoring_pool.terminate()
scoring_pool.join()
logger.debug(f'Found {self.found_pipelines} pipelines')
break

def check_automl_hyperparams(self, automl_hyperparams):
Expand Down
2 changes: 1 addition & 1 deletion alpha_automl/pipeline_synthesis/pipeline_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def change_default_hyperparams(primitive_object):
elif isinstance(primitive_object, OrdinalEncoder):
primitive_object.set_params(handle_unknown='use_encoded_value', unknown_value=-1)
elif isinstance(primitive_object, SimpleImputer):
primitive_object.set_params(strategy='most_frequent')
primitive_object.set_params(strategy='most_frequent', keep_empty_features=True)


class BaseBuilder:
Expand Down
15 changes: 6 additions & 9 deletions alpha_automl/pipeline_synthesis/setup_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
from os.path import join
from alpha_automl.grammar_loader import load_automatic_grammar, load_manual_grammar
from alpha_automl.pipeline import Pipeline
from alpha_automl.pipeline_search.Coach import Coach
from alpha_automl.pipeline_search.pipeline.NNet import NNetWrapper
from alpha_automl.pipeline_search.pipeline.PipelineGame import PipelineGame
Expand Down Expand Up @@ -49,8 +48,8 @@ def signal_handler(queue, signum):
sys.exit(0)


def search_pipelines(X, y, scoring, splitting_strategy, task_name, time_bound, automl_hyperparams, metadata,
output_folder, verbose, queue):
def search_pipelines(X, y, scoring, splitting_strategy, task_name, automl_hyperparams, metadata, output_folder, verbose,
queue):
signal.signal(signal.SIGTERM, lambda signum, frame: signal_handler(queue, signum))
hide_logs(verbose) # Hide logs here too, since multiprocessing has some issues with loggers

Expand All @@ -61,12 +60,10 @@ def evaluate_pipeline(primitives, origin):
score = None

if pipeline is not None:
score, start_time, end_time = score_pipeline(
pipeline, X, y, scoring, splitting_strategy, task_name
)
if score is not None:
pipeline_alphaautoml = Pipeline(pipeline, score, start_time, end_time)
queue.put(pipeline_alphaautoml) # Only send valid pipelines
alphaautoml_pipeline = score_pipeline(pipeline, X, y, scoring, splitting_strategy, task_name, verbose)
if alphaautoml_pipeline is not None:
score = alphaautoml_pipeline.get_score()
queue.put(alphaautoml_pipeline) # Only send valid pipelines

return score

Expand Down
12 changes: 9 additions & 3 deletions alpha_automl/scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from sklearn.metrics import make_scorer as make_scorer_sk
from sklearn.model_selection._split import BaseShuffleSplit, _RepeatedSplits
from sklearn.model_selection import BaseCrossValidator, KFold, ShuffleSplit, cross_val_score, TimeSeriesSplit
from alpha_automl.utils import RANDOM_SEED
from alpha_automl.utils import RANDOM_SEED, hide_logs
from sklearn.metrics import accuracy_score, f1_score, jaccard_score, precision_score, recall_score,\
max_error, mean_absolute_error, mean_squared_error, mean_squared_log_error, median_absolute_error, r2_score,\
adjusted_mutual_info_score, rand_score, mutual_info_score, normalized_mutual_info_score
from alpha_automl.pipeline import Pipeline
from alpha_automl.primitive_loader import PRIMITIVE_TYPES

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -120,7 +121,8 @@ def make_splitter(splitting_strategy, splitting_strategy_kwargs=None):
f'instance of BaseCrossValidator, BaseShuffleSplit, RepeatedSplits.')


def score_pipeline(pipeline, X, y, scoring, splitting_strategy, task_name):
def score_pipeline(pipeline, X, y, scoring, splitting_strategy, task_name, verbose):
hide_logs(verbose) # Hide logs here too, since multiprocessing has some issues with loggers
score = None
start_time = None
end_time = None
Expand All @@ -139,7 +141,11 @@ def score_pipeline(pipeline, X, y, scoring, splitting_strategy, task_name):
logger.debug('Exception scoring a pipeline')
logger.debug('Detailed error:', exc_info=True)

return score, start_time, end_time
return None

alphaautoml_pipeline = Pipeline(pipeline, score, start_time, end_time)

return alphaautoml_pipeline


def make_str_metric(metric):
Expand Down
Loading

0 comments on commit 5bd0170

Please sign in to comment.