Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: prioritizes fitting by score in model selection #527

Open
wants to merge 68 commits into
base: main
Choose a base branch
from

Conversation

stsievert
Copy link
Member

What does this PR implement?
This PR prioritizes model training/scoring based on the most recent score in model selection.

@stsievert
Copy link
Member Author

This matters most when IncrementalSearchCV is fit in parallel. If only one IncrementalSearchCV instance is being run, there's no real advantage because the scores are only recorded after all models are finished training.

I've run some simulations on this for scipy-conference/scipy_proceedings#464. I've run these simulations both before and after this PR (respectively via the prioritization random and high-scores). I've run Hyperband 20 times in both cases with the same random state.

Screen Shot 2019-06-22 at 6 18 27 PM

The shaded regions correspond to the 25% to 75% percentile of scores.

Each run has the same random state, so both random and high-scores have the same parameters and validation set.

@stsievert
Copy link
Member Author

stsievert commented Jun 23, 2019

The comparison in #527 (comment) isn't valid: it uses different train datasets and different random states for each model.

Here's a comparison that runs one model. It have the same parameters, the same train data, the same validation data, and both models have the same random state.

Screen Shot 2019-06-22 at 9 51 33 PM

This is run with 4 Dask workers. The prioritization makes provides with the most gain in serial environments (aka 4 Dask workers).

@TomAugspurger
Copy link
Member

Are you able to reproduce the CI failure locally?

@stsievert
Copy link
Member Author

stsievert commented Jun 24, 2019

Are you able to reproduce the CI failure locally?

The timeout errors on test_incremental.py#test_basic? Yes, but they seem to happen randomly (with about 50% probability). I'm not sure why they happen, but here's the relevant output from pytest (which also appears on the CI logs):

	================================= FAILURES ==================================
	________________________________ test_basic _________________________________
    def test_func():
        del _global_workers[:]
        _global_clients.clear()
        Comm._instances.clear()
        active_threads_start = set(threading._active)

        reset_config()

        dask.config.set({"distributed.comm.timeouts.connect": "5s"})
        # Restore default logging levels
        # XXX use pytest hooks/fixtures instead?
        for name, level in logging_levels.items():
            logging.getLogger(name).setLevel(level)

        result = None
        workers = []

        with pristine_loop() as loop:
            with check_active_rpc(loop, active_rpc_timeout):

                @gen.coroutine
                def coro():
                    with dask.config.set(config):
                        s = False
                        for i in range(5):
                            try:
                                s, ws = yield start_cluster(
                                    ncores,
                                    scheduler,
                                    loop,
                                    security=security,
                                    Worker=Worker,
                                    scheduler_kwargs=scheduler_kwargs,
                                    worker_kwargs=worker_kwargs,
                                )
                            except Exception as e:
                                logger.error(
                                    "Failed to start gen_cluster, retrying",
                                    exc_info=True,
                                )
                            else:
                                workers[:] = ws
                                args = [s] + workers
                                break
                        if s is False:
                            raise Exception("Could not start cluster")
                        if client:
                            c = yield Client(
                                s.address,
                                loop=loop,
                                security=security,
                                asynchronous=True,
                                **client_kwargs
                            )
                            args = [c] + args
                        try:
                            future = func(*args)
                            if timeout:
                                future = gen.with_timeout(
                                    timedelta(seconds=timeout), future
                                )
                            result = yield future
                            if s.validate:
                                s.validate_state()
                        finally:
                            if client and c.status not in ("closing", "closed"):
                                yield c._close(fast=s.status == "closed")
                            yield end_cluster(s, workers)
                            yield gen.with_timeout(
                                timedelta(seconds=1), cleanup_global_workers()
                            )

                        try:
                            c = yield default_client()
                        except ValueError:
                            pass
                        else:
                            yield c._close(fast=True)

                        for i in range(5):
                            if all(c.closed() for c in Comm._instances):
                                break
                            else:
                                yield gen.sleep(0.05)
                        else:
                            L = [c for c in Comm._instances if not c.closed()]
                            Comm._instances.clear()
                            # raise ValueError("Unclosed Comms", L)
                            print("Unclosed Comms", L)

                        raise gen.Return(result)

                result = loop.run_sync(
>                   coro, timeout=timeout * 2 if timeout else timeout
                )

/Users/scott/anaconda3/lib/python3.6/site-packages/distributed/utils_test.py:1019:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/Users/scott/anaconda3/lib/python3.6/site-packages/tornado/ioloop.py:581: in run_sync
    return future_cell[0].result()
/Users/scott/anaconda3/lib/python3.6/site-packages/tornado/gen.py:1113: in run
    yielded = self.gen.send(value)
/Users/scott/anaconda3/lib/python3.6/site-packages/distributed/utils_test.py:987: in coro
    result = yield future
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <tornado.gen.Runner object at 0x1c25fc4240>

    def run(self):
        """Starts or resumes the generator, running until it reaches a
        yield point that is not ready.
        """
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if not future.done():
                    return
                self.future = None
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    exc_info = None

                    try:
>                       value = future.result()
E                       tornado.util.TimeoutError: Timeout

/Users/scott/anaconda3/lib/python3.6/site-packages/tornado/gen.py:1099: TimeoutError

@stsievert
Copy link
Member Author

Prioritizing models with high score helps find models with high score quicker on this iteration, but it can also mean a longer time to solution (i.e., the lowest scoring model will be trained after all other models are finished training). I'll reshuffle the priorities a bit to protect against this.

stsievert added a commit to stsievert/scipy_proceedings that referenced this pull request Jun 26, 2019
@stsievert
Copy link
Member Author

stsievert commented Jun 26, 2019

I've made some changes to the prioritization scheme. I don't fully prioritize by score now, only partially. The last num_workers models all have the same score, np.median(low_scores). Here's the resulting figure with the same setup as in #527 (comment):

Screen Shot 2019-06-25 at 10 04 10 PM

@stsievert
Copy link
Member Author

stsievert commented Jun 26, 2019

I'm also getting a TimeoutError on my local machine for model_selection/test_incremental.py#test_basic. I only get it on this branch, not on the master branch.

The CI also has 9 failing tests on preprocessing/test_encoders.py#test_basic_array. I think those are unrelated. Here's some output on that CI error:

        else:
            expected = a.fit_transform(X)
>           result = b.fit_transform(dX)

/usr/share/miniconda/envs/dask-ml-test/lib/python3.7/site-packages/dask/array/core.py:3234:   ValueError
/usr/share/miniconda/envs/dask-ml-test/lib/python3.7/site-packages/sklearn/preprocessing/_encoders.py:631: in fit_transform
    return self.fit(X).transform(X)
dask_ml/preprocessing/_encoders.py:209: in transform
    return self._transform(X)
dask_ml/preprocessing/_encoders.py:237: in _transform
    X = da.concatenate(Xs, axis=1)
>       meta = np.concatenate([meta_from_array(s) for s in seq], axis=axis)
E       ValueError: zero-dimensional arrays cannot be concatenated

/usr/share/miniconda/envs/dask-ml-test/lib/python3.7/site-packages/dask/array/core.py:3234: ValueError

@TomAugspurger
Copy link
Member

Looking into the CI now, over in #529

@stsievert
Copy link
Member Author

I'm getting CI errors on Azure on tests/model_selection/test_incremental.py::test_basic that I can't reproduce locally (it throws a tornado.util.TimeoutError). This only happens on some tests (linux linux and linux earliest).

Any ideas on what might be throwing this error?

@TomAugspurger
Copy link
Member

TomAugspurger commented Oct 21, 2019 via email

@stsievert
Copy link
Member Author

stsievert commented Nov 28, 2019

I've figured out what the CI failure is. I can reproduce the timeout on tests/model_selection/test_incremental.py::test_basic (at least, it hangs and does not complete). The test passes sometimes, and fails randomly with maybe 20% probability.

It looks like this the test is failing because the cluster isn't cleaning up nicely. This diff makes the test pass:

-     while c.futures or s.tasks:  # Cleans up cleanly after running
-         yield gen.sleep(0.01)
+     start = time()
+     while c.futures or s.tasks:  # Cleans up cleanly after running
+         yield gen.sleep(0.01)
+        if time() - start > 5:
+            break

Here's part of the diff from this PR that could play a role:

# def _fit(...):
-       d_partial_fit = dask.delayed(_partial_fit)
+       def d_partial_fit(*args, **kwargs):
+           return client.submit(_partial_fit, *args, **kwargs)

@stsievert
Copy link
Member Author

It looks like this has gone a bit stale. @stsievert @TomAugspurger is this still worth pursuing?

I'd like to see this PR merged. I've mentioned this as a feature of Dask-ML in talks/papers, though perhaps it's not relevant until #677 is complete and merged.

I've resolved the merge conflicts. Hopefully the CI passes; almost all of 413ba24 passed.

@TomAugspurger
Copy link
Member

Seem to be some test failures. I didn't look closely, but sklearn did have a release today.

Also perhaps a linting issue.

@stsievert
Copy link
Member Author

stsievert commented Aug 4, 2020

Some sklearn-dev tests are failing; sklearn issues a FutureWarning for FeatureUnion with message "Using None as a transformer is deprecated in version 0.22 and will be removed in version 0.24. Please use 'drop' instead."

I need to look more closely at the tests again; looks like there's a couple failing tests on timeout error again.

@TomAugspurger
Copy link
Member

@stsievert if you merge master the unrelated CI failures should be fixed.

@stsievert
Copy link
Member Author

Thanks, I've merged master. I made the test_incremental.py#test_basic weaker by adding a pytest.xfail for TimeoutErrors. The tests pass on my own machine w/o timeouts. They sometimes fail on Travis CI; I haven't been able to detect any pattern with machine OS/Python version (though windows does seem common).

@TomAugspurger
Copy link
Member

Still some timeout errors. The tests are running on azure pipelines, so that's what would need to be updated.

That said, I worry about just xfailing these. Do you think that it's truly just an issue on CI, or is indicating some fundamental issue?

@mrocklin
Copy link
Member

mrocklin commented Aug 5, 2020

test_basic is genuinely hanging locally for me

@stsievert
Copy link
Member Author

It's hanging for me too, not deterministically: test_basic completed in about a second in the 2 for 3 runs I've done this morning. Let me do some more debugging.

@stsievert
Copy link
Member Author

stsievert commented Aug 5, 2020

I've added this diff to test_basic to make the test pass locally:

- while c.futures or s.tasks:  # Make sure cleans up cleanly after running
-     await asyncio.sleep(0.1)
+ _start = time()
+ while c.futures or s.tasks:  # Make sure cleans up cleanly after running
+     await asyncio.sleep(0.1)
+     if time() - _start >= 5:
+         assert c.futures == {}
+         assert all(task.state == "released" for task in s.tasks.values())
+         break

Is this an acceptable test modification? My justification is that "released" tasks are soon to be "forgotten" (source).

@TomAugspurger
Copy link
Member

TomAugspurger commented Aug 5, 2020

IIUC, the tests are typically written with some sort of deadline for cleanup to complete. If that deadline passes without the condition then we error the test.

Something like

deadline = time() + 5
while c.futures:
    if time() > deadline:
        raise ValueError("Failed to clean up")
    await asyncio.sleep(0.1)

assert c.futures == {}
assert all(task.state == "released" for task in s.tasks.values())

@stsievert
Copy link
Member Author

Hm... I've pushed that change, which involved this diff:

- while c.futures or s.tasks:
+ while c.futures:

Is that appropriate? test_client.py#L400-L435 seems to be testing how well the client cleans up, and only uses c.futures.

@TomAugspurger
Copy link
Member

That seems right to me if I understand the expected behavior. I might be missing some context around the test though.


What's the status here vs. master? Is it fair to say that there's a legitimate timeout issue on both master and this PR? Does this PR exacerbate that issue?

@stsievert
Copy link
Member Author

It appears to be the state on master, or at least in other PRs. Here's some commits that have raised TimeoutErrors in the model selection tests:

All other tests pass (besides linting). This PR has seen the following failures on 30fa47d:

  • test_incremental.py::test_basic on linux38
  • test_incremental.py::test_basic on sklearnDev
  • test_hyperband.py::test_hyperband_patience, test_incremental.py::test_basic, test_incremental.py::test_search_basic on win64

This might be a testing problem; I just ran smoothly some examples with SciKeras models.

@stsievert
Copy link
Member Author

stsievert commented Aug 6, 2020

@mrocklin I think you implemented the test in #527 (comment). Does the diff in that comment look okay to you? Is there more context behind that condition that's being missed?

while c.futures or s.tasks: # Make sure cleans up cleanly after running
# Make sure cleans up quickly after running
deadline = time() + 5
while c.futures:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What tasks are hanging around? Printing out s.tasks and seeing what is sticking around and why would probably help isolate the problem here pretty quickly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like calls to partial_fit are staying in state "memory" instead of moving to state "released." I couldn't find the reason why in my poking around. Here's some more context for one specific task:

Here's some of s.tasks when the ValueError is raised:

{"('array-5c6c0aea4775b68f8400b686cdf361ee', 0)": <Task "('array-5c6c0aea4775b68f8400b686cdf361ee', 0)" released>,
 "('astype-getitem-a0e460089e0b0c5d0baf1ca408aaff5b', 0)": <Task "('astype-getitem-a0e460089e0b0c5d0baf1ca408aaff5b', 0)" released>,
 "('astype-getitem-eb01c7dcea1f24ea913fd665b30d03a4', 0)": <Task "('astype-getitem-eb01c7dcea1f24ea913fd665b30d03a4', 0)" released>,
 "('getitem-5342f25119d203e5d67f88228f1e00b5', 0, 0)": <Task "('getitem-5342f25119d203e5d67f88228f1e00b5', 0, 0)" released>,
 "('getitem-99834584206c72f1f1a79495c4c62593', 0, 0)": <Task "('getitem-99834584206c72f1f1a79495c4c62593', 0, 0)" released>,
 "('getitem-a0e460089e0b0c5d0baf1ca408aaff5b', 0)": <Task "('getitem-a0e460089e0b0c5d0baf1ca408aaff5b', 0)" released>,
 "('getitem-c0cd079c7dd295b1df3a8ceb8158f96c', 0, 0)": <Task "('getitem-c0cd079c7dd295b1df3a8ceb8158f96c', 0, 0)" released>,
 "('getitem-c0cd079c7dd295b1df3a8ceb8158f96c', 1, 0)": <Task "('getitem-c0cd079c7dd295b1df3a8ceb8158f96c', 1, 0)" released>,
 "('getitem-eb01c7dcea1f24ea913fd665b30d03a4', 0)": <Task "('getitem-eb01c7dcea1f24ea913fd665b30d03a4', 0)" released>,
 "('normal-71ffc5749897310d732642de69ab3045', 0, 0)": <Task "('normal-71ffc5749897310d732642de69ab3045', 0, 0)" released>,
 "('normal-71ffc5749897310d732642de69ab3045', 1, 0)": <Task "('normal-71ffc5749897310d732642de69ab3045', 1, 0)" released>,
 "('normal-getitem-5342f25119d203e5d67f88228f1e00b5', 0, 0)": <Task "('normal-getitem-5342f25119d203e5d67f88228f1e00b5', 0, 0)" released>,
 "('normal-getitem-c0cd079c7dd295b1df3a8ceb8158f96c', 0, 0)": <Task "('normal-getitem-c0cd079c7dd295b1df3a8ceb8158f96c', 0, 0)" released>,
 "('random_sample-d4e3c66fd779378c4e00049a86312ef1', 0)": <Task "('random_sample-d4e3c66fd779378c4e00049a86312ef1', 0)" released>,
 "('random_sample-d4e3c66fd779378c4e00049a86312ef1', 1)": <Task "('random_sample-d4e3c66fd779378c4e00049a86312ef1', 1)" released>,
 "('sum-aggregate-eb653519b96897acbc044f0da8455aec', 0)": <Task "('sum-aggregate-eb653519b96897acbc044f0da8455aec', 0)" released>,
 "('sum-aggregate-eb653519b96897acbc044f0da8455aec', 1)": <Task "('sum-aggregate-eb653519b96897acbc044f0da8455aec', 1)" released>,
 "('sum-sum-aggregate-eb653519b96897acbc044f0da8455aec', 0)": <Task "('sum-sum-aggregate-eb653519b96897acbc044f0da8455aec', 0)" released>,
 "('sum-sum-aggregate-eb653519b96897acbc044f0da8455aec', 1)": <Task "('sum-sum-aggregate-eb653519b96897acbc044f0da8455aec', 1)" released>,
 '_create_model-204466dbc69f6ce91aae7a8f90e696f8': <Task '_create_model-204466dbc69f6ce91aae7a8f90e696f8' released>,
 '_create_model-3499389710d7a55287c8e1fd9d3e8a86': <Task '_create_model-3499389710d7a55287c8e1fd9d3e8a86' released>,
 '_partial_fit-20faea901cc3feebc39428be644dce28': <Task '_partial_fit-20faea901cc3feebc39428be644dce28' released>,
 '_partial_fit-36af93d60c2f4fcd5e15cbbceef5b79f': <Task '_partial_fit-36af93d60c2f4fcd5e15cbbceef5b79f' memory>,
 '_partial_fit-593a8e400151a3dea752eab563d2c641': <Task '_partial_fit-593a8e400151a3dea752eab563d2c641' memory>,
 '_partial_fit-5c386d4ccf33082fcec2b1c3aecb9e9a': <Task '_partial_fit-5c386d4ccf33082fcec2b1c3aecb9e9a' released>,
 '_partial_fit-6ab6fa94ce10d5e2977cb93ec6b585c3': <Task '_partial_fit-6ab6fa94ce10d5e2977cb93ec6b585c3' released>,
 '_partial_fit-842952cf6460cfad15896c7e8d7a673d': <Task '_partial_fit-842952cf6460cfad15896c7e8d7a673d' memory>,
 '_partial_fit-95daf36e4defa195bab22b49f709efd7': <Task '_partial_fit-95daf36e4defa195bab22b49f709efd7' released>,
 '_partial_fit-99f97f6b969121825fd2b90aad30381c': <Task '_partial_fit-99f97f6b969121825fd2b90aad30381c' released>,
 '_partial_fit-a20889336cd81797af3da7d22321b34f': <Task '_partial_fit-a20889336cd81797af3da7d22321b34f' memory>,
 '_partial_fit-ab249a7357580aa18fe2eba60be72bcb': <Task '_partial_fit-ab249a7357580aa18fe2eba60be72bcb' released>,
 '_score-15cae06b53fe052a9f7ffbc524357d5f': <Task '_score-15cae06b53fe052a9f7ffbc524357d5f' memory>,
 '_score-b79d83e626689a3b49e0dbf115a80ec3': <Task '_score-b79d83e626689a3b49e0dbf115a80ec3' memory>,
 'finalize-285f1af18fd76cac82cce5d7778d8044': <Task 'finalize-285f1af18fd76cac82cce5d7778d8044' released>,
 'finalize-e5aafd4559bab198c2561658595bfd3e': <Task 'finalize-e5aafd4559bab198c2561658595bfd3e' released>}

If I assign key = "_partial_fit-593a8e400151a3dea752eab563d2c641", then

>>> task = s.tasks[key]
>>> task
<Task '_partial_fit-593a8e400151a3dea752eab563d2c641' memory>
>>> task.who_has
{<Worker 'tcp://127.0.0.1:60747', name: 1, memory: 4, processing: 0>}
>>> task.waiters
set()
>>> task.dependents
set()
>>> task.dependencies
{<Task '_partial_fit-a20889336cd81797af3da7d22321b34f' memory>,
 <Task "('getitem-99834584206c72f1f1a79495c4c62593', 0, 0)" released>,
 <Task "('getitem-eb01c7dcea1f24ea913fd665b30d03a4', 0)" released>}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might take a look at s.clients to view the Scheduler's perspective of all of the clients and what they want.

  1. Is there a phantom client lying around maybe holding onto a future stopping it from being released?
  2. Does the scheduler's perspective of ClientState.wants_what match the state of Client.futures? If not, what is misaligned? What are the stories of those tasks (see the s.story method.

I'm hoping that this is interesting/educational for you. It seems like a nice opportunity to dive into the scheduler a bit. If you're very frustrated at this point let me know and I can dive in instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's useful I'd also be happy to jump on a 30 minute screenshare call. That might make debugging this sort of thing easier.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking in @mrocklin. I've been busy improving scikeras this week; I'd be happy to hop on a call after I spend a little longer on this PR.

Base automatically changed from master to main February 2, 2021 03:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants