New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proof of Concept: Separating the Engine from AutoMLSearch #1975
Conversation
Codecov Report
@@ Coverage Diff @@
## main #1975 +/- ##
=========================================
+ Coverage 100.0% 100.0% +0.1%
=========================================
Files 288 291 +3
Lines 23428 23698 +270
=========================================
+ Hits 23418 23688 +270
Misses 10 10
Continue to review full report at Codecov.
|
evalml/automl/engine/ray_engine.py
Outdated
@@ -0,0 +1,60 @@ | |||
from evalml.automl.engine.engine_base import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we planning on keeping this then? I see a lot of lines not covered by codecov 👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll get rid of it before merge!
@@ -562,14 +577,7 @@ def _should_continue(self): | |||
if self._interrupted: | |||
return False | |||
|
|||
# for add_to_rankings |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These lines were added for add_to_rankings
and baseline evaluation because the engine used to call the _should_continue_callback
. The engine doesn't do that anymore so we can delete these lines!
Performance tests are here! |
* Added DaskEngine unit tests * Added AutoMLSearch unit tests utilizing Dask Engine
… 1914-engine-redesign
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting stuff. left some comments based on a quick review
evalml/automl/automl_search.py
Outdated
else: | ||
self._engine = engine | ||
|
||
self.automl_data = AutoMLData(self.ensembling_indices, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
take this with a grain of salt since i only am doing a quick pass, but i wonder if AutoMLConfig
or something like that might be a better name for this. When I see "data" I assume it is referring to X
and y
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed!
evalml/automl/automl_search.py
Outdated
@@ -538,8 +556,8 @@ def _find_best_pipeline(self): | |||
train_indices = self.data_splitter.transform_sample(X_train, y_train) | |||
X_train = X_train.iloc[train_indices] | |||
y_train = y_train.iloc[train_indices] | |||
best_pipeline = self._engine.train_pipeline(best_pipeline, X_train, y_train, | |||
self.optimize_thresholds, self.objective) | |||
best_pipeline = train_pipeline(best_pipeline, X_train, y_train, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder if we want this to be called through the engine?
a use case to imagine is that you are running evalml on your laptop and you've connected it to a remote dask cluster that has plenty of compute (perhaps the X
and y
are reference to dask dataframes where the data is remote too). you want all the training to happen on the remote dask cluster.
if i understand the code correctly, right now the search phase will happen on the remote cluster, but the final training of the pipeline would happen locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call! That was my intention but I think I forgot to actually make the change hehe.
self._pre_evaluation_callback(pipeline) | ||
computation = self._engine.submit_evaluation_job(self.automl_data, pipeline, self.X_train, self.y_train) | ||
computations.append(computation) | ||
while self._should_continue() and len(computations) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the behavior of this when using the dask engine and all pipelines in a batch are currently computing? does it just iterate as fast as it can over and over trying to find whatever computation has completed?
I wonder if this code can be simplified using a method like: https://docs.dask.org/en/latest/futures.html#distributed.as_completed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right about the current behavior! I thought about using/writing our own as_completed
but the one tricky thing is that we want to stop the search as soon as one of our stopping criteria is met but as_completed
will only pull off futures if they're done. I think this will make it hard to respect time based stopping rules.
For example, let's say the user has a 2 hour stop limit and fires off four jobs. The first three finish in 90 minutes but the fourth one won't finish until 6 hours.
If we do this:
for computation in as_completed(computations):
if not self._should_continue():
break
We actually won't reach the break until hour 6 because that's when the final future finished.
I think we can definitely explore ways to consolidate/simplify the logic for pulling futures off of our queue but I'd rather leave that improvement for a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder if we need some sort of sleep call in the loop then. if you have a bunch of pipelines training for a while with none finishing, i dont think you want this loop iterating as fast as it can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point Max. Yes, a short sleep here is probably a good idea for now.
Good call about distributed.as_completed
here. I was going to suggest asyncio
. Eventually we can make the whole automl API non-blocking ( #1047 ), but that is definitely out of scope for the current work though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work Freddy, really great work. This is a huge step in the right direction toward separating out this code from AutoMLSearch. Take or leave the comment I left about the refactoring those similar while loops. I think they might be a little different and if the refactor looks less clear, then forget it.
evalml/automl/automl_search.py
Outdated
@@ -541,8 +556,8 @@ def _find_best_pipeline(self): | |||
train_indices = self.data_splitter.transform_sample(X_train, y_train) | |||
X_train = X_train.iloc[train_indices] | |||
y_train = y_train.iloc[train_indices] | |||
best_pipeline = self._engine.train_pipeline(best_pipeline, X_train, y_train, | |||
self.optimize_thresholds, self.objective) | |||
best_pipeline = train_pipeline(best_pipeline, X_train, y_train, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this taken away from the engine because it probably just makes sense to do it locally since it's one training computation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my mistake. I think the right thing to do is have the engine train all pipelines but I missed this spot hehe.
loop_interrupted = False | ||
except KeyboardInterrupt: | ||
loop_interrupted = True | ||
if self._handle_keyboard_interrupt(): | ||
break | ||
self._interrupted = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tried testing the keyboard interrupt manually? Just curious. It seems like the probability of being able to keyboard interrupt the parallel evaluation will be low as you'd have to interrupt in the middle of the loop that's submitting the jobs. Once those futures are sent off to be evaluated, I don't think we can interrupt them anymore. I'm not stressing it that much. We might have to implement a different type of keyboard interrupt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep I think you're right. There is a small window of time where if someone does a Ctrl-C, it won't be caught by this except
, if that's what you're referring to. It doesn't bother me too much either.
@freddyaboulton it would be ideal if we could cancel the futures here when the user opts to exit.
while computations: | ||
computation = computations.pop(0) | ||
if computation.done(): | ||
try: | ||
fitted_pipeline = computation.get_result() | ||
fitted_pipelines[fitted_pipeline.name] = fitted_pipeline | ||
except Exception as e: | ||
logger.error(f'Train error for {pipeline.name}: {str(e)}') | ||
tb = traceback.format_tb(sys.exc_info()[2]) | ||
logger.error("Traceback:") | ||
logger.error("\n".join(tb)) | ||
else: | ||
computations.append(computation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IS this chunk worth pulling into a function? I think you use it at least 3 times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not exactly the same because the thing you get back from get_result
is not the same between training and scoring and the logic for handling exceptions is not the same as well.
This might be a good spot for @kmax12 's suggestion to use as_completed
or something like it.
If we do that, I think this will look like:
for computation in as_completed(computations):
try:
fitted_pipeline = computation.get_result()
fitted_pipelines[fitted_pipeline.name] = fitted_pipeline
except Exception as e:
logger.error(f'Train error for {pipeline.name}: {str(e)}')
tb = traceback.format_tb(sys.exc_info()[2])
logger.error("Traceback:")
logger.error("\n".join(tb))
I think at that point we're not reusing the while-loop logic so the differences between the two methods might be clearer.
If you're on board I'll file an issue after merge!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes please do! This proposal looks great. I am still confused how keyboard interrupt would fit with that example, but I like that time.sleep
isn't needed.
Also FWIW, we do have the option to stop supporting keyboard interrupt if it is significantly hard to maintain. Its cool, but not worth too much of our time IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #2096 to track consolidating these two while loops!
return new_pipeline_ids | ||
def done(self) -> bool: | ||
"""Is the computation done?""" | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this definitely true, though?
|
||
|
||
# Top-level replacement for AutoML object to supply data for testing purposes. | ||
def err_call(*args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry - this is my bad...might want to drop an ignore in here for the time being.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think providing just a docstring will make codecov happy so I'll push that up now.
X, y = X_y_binary | ||
|
||
mock_input.side_effect = user_input | ||
mock_future_get_result.side_effect = KeyboardInterruptOnKthPipeline(k=when_to_interrupt, starting_index=1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, cool, I like this test a lot and I think that's really clever and I think that allays my suspicions about when the keyboard interrupt does its job 👍
else: | ||
correct_output = automl._pipelines_per_batch | ||
assert output.count(f"Batch {batch_number}: ") == correct_output | ||
assert output.count("Batch Number") == max_batches |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me! Left two comments about renaming and adding doc string, but the tests are very solid! Love the comparisons between the performance between parallel and sequential.
evalml/automl/engine/engine_base.py
Outdated
return {'cv_data': cv_data, 'training_time': training_time, 'cv_scores': cv_scores, 'cv_score_mean': cv_score_mean}, cv_pipeline, logger | ||
|
||
|
||
def evaluate_pipeline(pipeline, automl_data, X, y, logger): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add docstring for this and for score_pipeline
evalml/automl/utils.py
Outdated
@@ -110,3 +112,8 @@ def check_all_pipeline_names_unique(pipelines): | |||
plural, tense = ("s", "were") if len(duplicate_names) > 1 else ("", "was") | |||
duplicates = ", ".join([f"'{name}'" for name in sorted(duplicate_names)]) | |||
raise ValueError(f"All pipeline names must be unique. The name{plural} {duplicates} {tense} repeated.") | |||
|
|||
|
|||
AutoMLData = namedtuple("AutoMLData", ["ensembling_indices", "data_splitter", "problem_type", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with Max about naming here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep I also prefer AutoMLConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@freddyaboulton this is amazing! So many good changes :D the dask engine code looks 💯 . The EngineComputation
abstraction is super helpful. Well done to you and to @chukarsten 😁
I don't have many blocking comments. I agree with what Max said about adding a sleep
in the main loop. I left a couple comments about the keyboard interrupt stuff in the main loop(s). I do think we should call that config object AutoMLConfig
as suggested. And I left a couple comments on the test utils.
Also, I know dask
is included in evalml as a core requirement through featuretools
, but for clarity I think we should list dask in our core-requirements.txt
after featuretools
.
Since this is a significant change, please run the performance tests once more before merging as a protective measure.
@@ -83,7 +89,8 @@ def __init__(self, | |||
train_best_pipeline=True, | |||
pipeline_parameters=None, | |||
_ensembling_split_size=0.2, | |||
_pipelines_per_batch=5): | |||
_pipelines_per_batch=5, | |||
engine=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1! If its time to make this method "public", let's doc it.
evalml/automl/automl_search.py
Outdated
else: | ||
self._engine = engine | ||
|
||
self.automl_data = AutoMLData(self.ensembling_indices, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed!
self._pre_evaluation_callback(pipeline) | ||
computation = self._engine.submit_evaluation_job(self.automl_data, pipeline, self.X_train, self.y_train) | ||
computations.append(computation) | ||
while self._should_continue() and len(computations) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point Max. Yes, a short sleep here is probably a good idea for now.
Good call about distributed.as_completed
here. I was going to suggest asyncio
. Eventually we can make the whole automl API non-blocking ( #1047 ), but that is definitely out of scope for the current work though.
loop_interrupted = False | ||
except KeyboardInterrupt: | ||
loop_interrupted = True | ||
if self._handle_keyboard_interrupt(): | ||
break | ||
self._interrupted = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep I think you're right. There is a small window of time where if someone does a Ctrl-C, it won't be caught by this except
, if that's what you're referring to. It doesn't bother me too much either.
@freddyaboulton it would be ideal if we could cancel the futures here when the user opts to exit.
evalml/automl/automl_search.py
Outdated
self._pre_evaluation_callback(baseline) | ||
logger.info(f"Evaluating Baseline Pipeline: {baseline.name}") | ||
computation = self._engine.submit_evaluation_job(self.automl_data, baseline, self.X_train, self.y_train) | ||
data, pipeline, job_log = computation.get_result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet
For future-proofing, could we do
result = computation.get_result()
data = result.get('data')
pipeline = result.get('pipeline')
job_log = result.get('job_log')
Then we can add/reorder fields from the results in the future without making a change which breaks the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
additional_objectives=additional_objectives, | ||
optimize_thresholds=optimize_thresholds, | ||
error_callback=error_callback, | ||
random_seed=random_seed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid strange errors arising from accidentally mutating global state: could you please create this stuff in a test fixture, which runs once per test, instead of once at import-time as things stand currently?
You could even move it to evalml/tests/conftest.py.
@@ -82,7 +82,7 @@ def add_result_callback(results, trained_pipeline, automl_obj, counts=counts): | |||
add_result_callback=add_result_callback, n_jobs=1) | |||
automl.search() | |||
|
|||
assert counts["start_iteration_callback"] == max_iterations | |||
assert counts["start_iteration_callback"] == len(get_estimators("regression")) + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, why did this have to change? Were we not calling this for the baseline pipeline before? That seems like a positive change to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct!
class TestDaskEngine(unittest.TestCase): | ||
|
||
@classmethod | ||
def setUpClass(cls) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the -> None
?
|
||
@classmethod | ||
def setUpClass(cls) -> None: | ||
cls.client = Client() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. This starts a local dask cluster, once for all of the tests in this class, correct? Any idea how many cores it'll take, how many workers it'll have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dsherry instantiating with no arguments created a Client() based off a LocalCluster() with number of workers related to the number of available cores on the machine running it.
|
||
@classmethod | ||
def tearDownClass(cls) -> None: | ||
cls.client.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice
I suggest moving this to be at the top near setup so its obvious to new readers what's going on.
pre_evaluation_callback=self._pre_evaluation_callback, | ||
post_evaluation_callback=self._post_evaluation_callback) | ||
if not engine: | ||
self._engine = SequentialEngine() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking that sooner or later we should switch this to default to DaskEngine
with a local dask cluster. What would we need to do in order to make that happen? By my accounting: make sure the team feels good about this move, list dask in our requirements, update our documentation to explain what's going on and how to configure a remote cluster, and confirm that the perf testing we've done is adequate to make this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was kind of thinking the same thing working through all this. It seems that, in the grand scheme of things, a DaskEngine with 1 worker eliminates the need for a SequentialEngine. And yeah, that sounds about right, but the proper documentation for how to limit the DaskEngine to a certain number of workers and perhaps an expectation of how many default workers there should be are required.
So currently the AutoMLSearch is designed to take in an engine
argument. The EngineBase
object expects a client
argument, in the case of a DaskEngine
. And the client object, when it's not run as a default instance, takes a cluster
argument. The cluster
is where you lay out all the details behind the resources and amount of parallelism you want to use. As such, I think we need to answer the questions 1.) what are the default settings for workers we want for the DaskEngine? and 2.) what are the default settings for workers we want for the AutoMLSearch? We could either go the way of trying to make it as fast as possible for the user by default or protect their resource by default. I think whichever of the two philosophies (faster vs. resource conservative) we choose, we should probably be consistent between DaskEngine and AutoMLSearch.
…and sleep in automl search loop.
Updated perf tests results are here. Same performance improvements after making the requested changes so I think this is good to merge! |
…skEngine`` #1975. - Added optional ``engine`` argument to ``AutoMLSearch`` #1975 - Added a warning about how time series support is still in beta when a user passes in a time series problem to ``AutoMLSearch`` #2118 - Added ``NaturalLanguageNaNDataCheck`` data check #2122 - Added ValueError to ``partial_dependence`` to prevent users from computing partial dependence on columns with all NaNs #2120 - Added standard deviation of cv scores to rankings table #2154 - Fixed ``BalancedClassificationDataCVSplit``, ``BalancedClassificationDataTVSplit``, and ``BalancedClassificationSampler`` to use ``minority:majority`` ratio instead of ``majority:minority`` #2077 - Fixed bug where two-way partial dependence plots with categorical variables were not working correctly #2117 - Fixed bug where ``hyperparameters`` were not displaying properly for pipelines with a list ``component_graph`` and duplicate components #2133 - Fixed bug where ``pipeline_parameters`` argument in ``AutoMLSearch`` was not applied to pipelines passed in as ``allowed_pipelines`` #2133 - Fixed bug where ``AutoMLSearch`` was not applying custom hyperparameters to pipelines with a list ``component_graph`` and duplicate components #2133 - Removed ``hyperparameter_ranges`` from Undersampler and renamed ``balanced_ratio`` to ``sampling_ratio`` for samplers #2113 - Renamed ``TARGET_BINARY_NOT_TWO_EXAMPLES_PER_CLASS`` data check message code to ``TARGET_MULTICLASS_NOT_TWO_EXAMPLES_PER_CLASS`` #2126 - Modified one-way partial dependence plots of categorical features to display data with a bar plot #2117 - Renamed ``score`` column for ``automl.rankings`` as ``mean_cv_score`` #2135 - Fixed ``conf.py`` file #2112 - Added a sentence to the automl user guide stating that our support for time series problems is still in beta. #2118 - Fixed documentation demos #2139 - Update test badge in README to use GitHub Actions #2150 - Fixed ``test_describe_pipeline`` for ``pandas`` ``v1.2.4`` #2129 - Added a GitHub Action for building the conda package #1870 #2148 .. warning:: - Renamed ``balanced_ratio`` to ``sampling_ratio`` for the ``BalancedClassificationDataCVSplit``, ``BalancedClassificationDataTVSplit``, ``BalancedClassficationSampler``, and Undersampler #2113 - Deleted the "errors" key from automl results #1975 - Deleted the ``raise_and_save_error_callback`` and the ``log_and_save_error_callback`` #1975 - Fixed ``BalancedClassificationDataCVSplit``, ``BalancedClassificationDataTVSplit``, and ``BalancedClassificationSampler`` to use minority:majority ratio instead of majority:minority #2077
Pull Request Description
Fixes #1914. Fixes #1972
I think there are good reasons to go this route:
Demo with dask
Cons (strategically placed at the bottom 😂):
Performance comparison: About 70% faster on AutoML
After creating the pull request: in order to pass the release_notes_updated check you will need to update the "Future Release" section of
docs/source/release_notes.rst
to include this pull request by adding :pr:123
.