-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed flaky test-rearrange #5977
Fixed flaky test-rearrange #5977
Conversation
Puts file cleanup in the task graph, rather than relying on Partd.
@mrocklin you mentioned that you have concerns about this approach. Can you share them when you get a chance, before I put more time into finishing it off / finding an alternative? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so now I see that we're still using the existing cleanup mechanism in partd, so presumably this is unlikely to be less robust. I guess I'm coming around. I do have a few comments below though.
p.append(d, fsync=True) | ||
except Exception: | ||
try: | ||
p.drop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want to drop this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this matches the behavior on master of a case where something in shuffle_group_3
raises an exception. Previously, p
would go out of scope and be garbage collected, which includes a call to partd.File.drop
.
Because we're explicitly providing a path to create the partd.File now, we're responsible for cleaning up.
path = None | ||
|
||
if path: | ||
shutil.rmtree(path, ignore_errors=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's low priority, but some folks might create their own PartD objects here. In the future we might want a more robust solution to finding a File object.
dask/dataframe/shuffle.py
Outdated
} | ||
cleanup_token = "cleanup-" + always_new_token | ||
dsk5 = {cleanup_token: (cleanup_partd_files, p, list(dsk4))} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that you're bringing back all of the results into a single task with list(dsk4)
.
If we do this approach then we might have to have an intermediary set of tasks that take in the results of dsk4 and return None for each. In this way we maintain the dependencies, but don't move the data around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mmm OK. Do you know, does dsk3
at https://github.com/dask/dask/pull/5977/files#diff-83ae5352ddc87bd80c831102addd9b1eL407 have a similar problem? Perhaps that result isn't as large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, my guess is that the output tasks in dsk2 write things to disk and then return None
.
The output tasks of dsk4
read from disk and return dataframes, so this is more of a concern.
Instead, I think that the solution is to have a dsk4b
that maps lambda x: None
across the outputs of dsk4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dask/dataframe/shuffle.py
Outdated
} | ||
cleanup_token = "cleanup-" + always_new_token | ||
dsk5 = {cleanup_token: (cleanup_partd_files, p, list(dsk4))} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mmm OK. Do you know, does dsk3
at https://github.com/dask/dask/pull/5977/files#diff-83ae5352ddc87bd80c831102addd9b1eL407 have a similar problem? Perhaps that result isn't as large.
p.append(d, fsync=True) | ||
except Exception: | ||
try: | ||
p.drop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this matches the behavior on master of a case where something in shuffle_group_3
raises an exception. Previously, p
would go out of scope and be garbage collected, which includes a call to partd.File.drop
.
Because we're explicitly providing a path to create the partd.File now, we're responsible for cleaning up.
@@ -779,34 +793,6 @@ def test_compute_divisions(): | |||
compute_divisions(c) | |||
|
|||
|
|||
# TODO: Fix sporadic failure on Python 3.8 and remove this xfail mark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed this test, but I can restore it if needed.
It's asserting that there are some files left around, but I'm not sure that we want that / can reliably assert that. My understanding was that we wanted them to be cleaned up automatically.
This concerns me. We will often fail at this. What is some exception happens during computation and we never get to the cleanup step? |
1b704d7 attempts to make this clearer. The idea is to wrap calls using the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gave this another look and I'm sufficiently happy with where it's at.
Thanks for your work on this @TomAugspurger |
Puts file cleanup in the task graph, rather than relying on Partd to do it for us. I'm not really happy with my approach, but wanted to try this on CI a few times.
Closes #5867