Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/parquet notie #1

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

Feat/parquet notie #1

wants to merge 6 commits into from

Conversation

hyenal
Copy link
Owner

@hyenal hyenal commented Aug 23, 2023

  • Closes #xxxx
  • Tests added / passed
  • Passes pre-commit run --all-files

@hyenal hyenal force-pushed the feat/parquet-notie branch 3 times, most recently from bb6734c to 38f4be4 Compare August 25, 2023 07:51
try removing bug


minor


this should work 


use dask compute instead


fixing some tests


return something that shows success in completion


revert some changes


replace expected metadata, avoid a trick


this should work best


re-integrate saving metadata file


try this


remove comments


add comments


add comments
Copy link

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @hyenal !

I'm not entirely sure how I feel about the Series return, but I will give it some more thought. Can you confirm that these changes fully address your original motivation (dask#10463)? The 0 result is still being reduced onto one process (the client).

dask/dataframe/io/parquet/core.py Outdated Show resolved Hide resolved
# Convert data_write + dsk to computable collection
graph = HighLevelGraph.from_collections(final_name, dsk, dependencies=(data_write,))
out = Scalar(graph, final_name, "")
out = data_write

if compute:
out = out.compute(**compute_kwargs)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how I feel about the fact that out will now be a Series when write_metadata_file is False. Maybe when compute is True we should do something like:

    if compute:
        out.compute(**compute_kwargs)
        out = None

This would avoid the braking behavior of changing the return type of to_parquet for the default case of compute=True. However, we are still technically "breaking" the current API for any code that may be expecting
to_parquet(..., compute=False) to return None.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a message after but for the moment I just reverted the default behaviour in case we use compute=True. The goal of this MR is to provide more advanced users some ways to release memory earlier.

I actually feel like this is something we could introduce in a tutorial (or in the DataFrame docs) on working with preemptible instances. Or perhaps in the resilience section

@hyenal
Copy link
Owner Author

hyenal commented Aug 27, 2023

Thank you for the review, I will answer the comments inline:

Can you confirm that these changes fully address your original motivation (dask#10463)?

Yes it does :) The purpose of this MR is to give more control to the user in case compute=False

You can roll back this change.

Done :)

I'm not sure how I feel about the fact that out will now be a Series when write_metadata_file is False. Maybe when compute is True we should do something like:

I made a change to keep the old behaviour in case compute=True, since in that context the user may want to leave control and this would avoid memory transfer to the client. But in the case compute=False we can assume the users would like more control over the task.

However, we are still technically "breaking" the current API for any code that may be expecting
to_parquet(..., compute=False) to return None.

Right now I think a Scalar object is expected ? If this is breaking the API we could make it an option ? That may overload the arguments though :/

@rjzamora
Copy link

Sorry for the delay here @hyenal - I think I support this type of change , but I'm still a bit unsure about changing the return type from Scalar to Series (and haven't had much time to investigate the possible implications).

There is still a chance that we can go in this direction. In the mean time, I wonder if the following workaround useful to you:

def drop_reduction(out):
    # Drop the final reduction step from a `to_parquet`
    # graph, and return a `Series` object to compute

    from dask.utils_test import hlg_layer
    from dask.dataframe.core import new_dd_object

    graph = out.dask
    meta = pd.Series([0])
    keys = list(hlg_layer(graph, "to-parquet").keys())
    divisions = (None,) * (len(keys) + 1)
    return new_dd_object(graph, keys[0][0], meta, divisions).map_partitions(
        lambda _: 0,
        meta=meta,
        token="partition-written",
        enforce_metadata=False,
        transform_divisions=False,
        align_dataframes=False,
    )

...

out = drop_reduction(ddf.to_parquet(..., compute=False))
out.compute()

There are many different ways to do something like this, so my particular choice may not be the most concise.

@hyenal
Copy link
Owner Author

hyenal commented Aug 30, 2023

@rjzamora thanks a lot for the snippet, I think that would work in my case :) Although I am wondering whether compute would transfer results to the client as soon as possible (thus releasing memory) ?

I feel like the changes from Scalar to Series should have minimal implication but I will let you be the judge of that.

Do you think I should open an offical PR on the dask repository ? So that you can accept/reject it and close the related issue ? Since I have a fix I am in no rush but it could be useful for teh dask community for tracking it

@rjzamora
Copy link

Although I am wondering whether compute would transfer results to the client as soon as possible (thus releasing memory) ?

The snippet should return a result that is very similar to what to_parquet(..., compute=False) returns in your PR. So if out.compute() is effective in your PR, I'd expect it to be effective here as well.

I feel like the changes from Scalar to Series should have minimal implication but I will let you be the judge of that.
This is probably true for a large majority of users and workflows. However, I don't feel completely confident that other dask devs would feel the same.

Do you think I should open an offical PR on the dask repository ? So that you can accept/reject it and close the related issue ? Since I have a fix I am in no rush but it could be useful for teh dask community for tracking it

Yes. I do think that would be the best way to get broader feedback.

@hyenal
Copy link
Owner Author

hyenal commented Aug 30, 2023

The snippet should return a result that is very similar to what to_parquet(..., compute=False) returns in your PR. So if out.compute() is effective in your PR, I'd expect it to be effective here as well.

Okay, in my case I still need to do what I suggested in this post

    parquet_files = ddf.to_delayed()

    client: dask.distributed.Client = dask.distributed.get_client()
    futures = client.compute(parquet_files)

    for future in as_completed(futures):
        # Release the results
        future.result()
        future.release()

I have not found so far better ways to release memory as we go, I will mention it in the PR to dask :)

@hyenal
Copy link
Owner Author

hyenal commented Aug 31, 2023

This PR is moved here: dask#10486

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants