Skip to content

Add from_map function to Dask-DataFrame#8911

Merged
rjzamora merged 36 commits intodask:mainfrom
rjzamora:from_map
May 4, 2022
Merged

Add from_map function to Dask-DataFrame#8911
rjzamora merged 36 commits intodask:mainfrom
rjzamora:from_map

Conversation

@rjzamora
Copy link
Copy Markdown
Member

@rjzamora rjzamora commented Apr 11, 2022

This is an exploratory PR to add a new from_map function to dask.dataframe. The purpose of this function is to mimic the standard python map function, but for DataFrame-collection generation. For example, this would allow a simple Parquet read to be written as:

ddf = dd.from_map(pd.read_parquet, file_list, columns=["a"], engine="fastparquet")

This allows the user to easily do things like interact with the pyarrow.dataset API directly:

import pyarrow.dataset as ds

ddf = from_map(lambda x: x.to_table().to_pandas(), ds.dataset("tmpdir").get_fragments())

(Note that the pd.read_parquet function is mapped onto every element of file_list to generate a DataFrame collection.)

The motivation for such an API is to replace from_delayed as the "suggested" mechanism for custom DataFrame generation. Although #8852 is expected to improve the performance of from_delayed, creating a distinct Delayed object for every new partition will still lead to ugliness at scale (and is unnecessary when the same callable can be used to generate every partition anyway).

Proposed API/Docstring

@insert_meta_param_description
def from_map(
    func,
    *iterables,
    args=None,
    meta=None,
    divisions=None,
    label=None,
    token=None,
    enforce_metadata=True,
    **kwargs,
):
    """Create a DataFrame collection from a custom function map

    Parameters
    ----------
    func : callable
        Function used to create each partition. If ``func`` satisfies the
        ``DataFrameIOFunction`` protocol, column projection will be enabled.
    *iterables : Iterable objects
        Iterable objects to map to each output partition. all iterables must
        be the same length. This length determines the number of partitions
        in the output collection (only one element of each iterable will
        be passed to ``func`` for each partition).
    args : list or tuple, optional
        Positional arguments to broadcast to each output partition. Note
        that these arguments will always be passed to ``func`` after the
        ``iterables`` positional arguments.
    $META
    divisions : tuple, str, optional
        Partition boundaries along the index.
        For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions
        For string 'sorted' will compute the delayed values to find index
        values.  Assumes that the indexes are mutually sorted.
        If None, then won't use index information
    label : str, optional
        String to use as the function-name label in the output
        collection-key names.
    token : str, optional
        String to use as the "token" in the output collection-key names.
    enforce_metadata : bool, default True
        Whether to enforce at runtime that the structure of the DataFrame
        produced by ``func`` actually matches the structure of ``meta``.
        This will rename and reorder columns for each partition,
        and will raise an error if this doesn't work or types don't match.
    **kwargs:
        Key-word arguments to broadcast to each output partition. These
        same arguments will be passed to ``func`` for every output partition.

    Examples
    --------
    >>> import pandas as pd
    >>> import dask.dataframe as dd
    >>> func = lambda x, size=0: pd.Series([x] * size)
    >>> inputs = ["A", "B"]
    >>> dd.from_map(func, inputs, size=2).compute()
    0    A
    1    A
    0    B
    1    B
    dtype: object

    See Also
    --------
    dask.dataframe.from_delayed
    dask.layers.DataFrameIOLayer
    """

Other Alternatives

I also considered expanding the existing map_partitions function to handle the special case that an existing DataFrame object is not included in the args. However, I suspect that we loose more by complicating that API than we do by adding a new function with a simpler scope.

@ian-r-rose - Any thoughts on this? If others are supportive of this idea (or something similar), I will add tests and revise, etc.

@douglasdavis
Copy link
Copy Markdown
Member

douglasdavis commented Apr 12, 2022

I like this! Independent of the extended functionality you mentioned with the pyarrow example, just abstracting away the pattern of layer --> highlevelgraph --> new_object:

layer = DataFrameIOLayer(callable, inputs, ...)
hlg = HighLevelGraph(layer, ...)
return new_dd_object(hlg, ...)

into

return from_map(callable, inputs, ...)

seems like a good improvement IMO. I've taken some inspiration and started playing with something similar at https://github.com/ContinuumIO/dask-awkward/pull/49

And I agree with your thought about extending map_partitions, the separation between this new case (which is prefixed with from_) versus the case of dataframe-collection-already-exists (and now let's operate on the existing partitions via map_partitions) seems natural.

@rjzamora
Copy link
Copy Markdown
Member Author

rjzamora commented Apr 13, 2022

Thanks for the feedback @douglasdavis !

I completely agree that an API like this is a worthwhile improvement, so I will continue pushing on this PR.

 I suppose it may also make sense to consider if such an API would be useful in dask.array (cc @jrbourbeau). If so, it probably makes sense to ensure that a similar function signature will work for an array-based collection as well.

@rjzamora rjzamora marked this pull request as ready for review April 20, 2022 16:09
@rjzamora rjzamora changed the title [WIP] Add from_map function to Dask-DataFrame Add from_map function to Dask-DataFrame Apr 20, 2022
@rjzamora
Copy link
Copy Markdown
Member Author

Any thoughts on this new API proposal (or interested reviewers)? :) cc @dask/dataframe

Copy link
Copy Markdown
Member

@douglasdavis douglasdavis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few nit-picky comments

@douglasdavis
Copy link
Copy Markdown
Member

Something that is opaque to me: in from_map the HighLevelGraph.from_collections(graph, name, dependencies=[]) call always sets dependencies to an empty list. How is this possible with something like read_csv which uses dask.bytes to create delayed blocks of bytes? It doesn't seem to be an issue but it's something I'd like to better understand!

Copy link
Copy Markdown
Contributor

@bryanwweber bryanwweber left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora! This looks like a nice generalization. I saw a small typo and had a question as well.

@jsignell
Copy link
Copy Markdown
Member

I am still reading through, but I am strongly in favor of adding a new level from_map function (another option would be to name it from_partitions. I tried to do something like this for arrays in #6294 but I was too attached to the map_blocks interface. I like how this PR deviates from map_partitions to really serve this particular usecase well.

@jrbourbeau
Copy link
Copy Markdown
Member

I am still reading through, but I am strongly in favor of adding a new level from_map function (another option would be to name it from_partitions

Same here. I'll throw from_function in the pool of possible names too -- it seems to convey the intent of this method (at least in my mind). FWIW there's also a similar np.fromfunction method which it might be nice to be close-ish to when choosing a name.

@rjzamora
Copy link
Copy Markdown
Member Author

another option would be to name it from_partitions

Same here. I'll throw from_function in the pool of possible names too -- it seems to convey the intent of this method (at least in my mind). FWIW there's also a similar np.fromfunction method which it might be nice to be close-ish to when choosing a name.

Thanks for taking a look at this @jsignell and @jrbourbeau !

I'm very open to other alternatives to the from_map name. The from_partitions name feels a bit off to me since we are actually mapping to partitions. The from_function name was originally on my list, but I actually leaned away from it because of the numpy function which seemed quite different to me (but I may just be misunderstanding what that function does). My impression was that the numpy function doesn't really allow you to "map" anything to the function besides the indices, whereas the from_map function I am proposing is much more similar to python's map function.

)
if project_after_read:
return df[self.columns]
return df[self._columns]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused about all these columns changes. Is the goal to have self.columns always be a list?

Also I think this should be self.columns since self._columns can theoretically be None right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused about all these columns changes. Is the goal to have self.columns always be a list?

These changes just make it easier to conform to the new DataFrameIOFunction Protocol for column-projection. The subtle differences between self.columns and self.full_columns for csv made this tricky before.

Also I think this should be self.columns since self._columns can theoretically be None right?

Yep - Good call!

Function used to create each partition. If ``func`` satisfies the
``DataFrameIOFunction`` protocol, column projection will be enabled.
*iterables : Iterable objects
Iterable objects to map to each output partition. all iterables must
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo:

Suggested change
Iterable objects to map to each output partition. all iterables must
Iterable objects to map to each output partition. All iterables must

Also I was initially a little confused about the hierarchy of this arg. My current understanding is that iterables is a list of arbitratry len where each item within must have len = npartitions. So an example of an iterable would be a list of filenames right? I think this is what the description says, but it wasn't immediately obvious to me 🤷

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe your understanding is correct, but I did find it difficult to come up with good wording here.

# Input validation
if not callable(func):
raise ValueError("`func` argument must be `callable`")
lengths = set()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this equivalent to npartitions? That seems like it would be a slightly more legible var name.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's a set of the npartitions that the iterables suggest. I'm fine with this name, but it did take me a bit to understand the logic of this validation section.

@jsignell
Copy link
Copy Markdown
Member

Thinking it over again, I like from_map as the name.

For the array side, I think the biggest difference is that we need to know the block location for each output block.

@github-actions github-actions bot added the documentation Improve or add to documentation label Apr 29, 2022
Copy link
Copy Markdown
Member

@jsignell jsignell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with this as is!

@rjzamora
Copy link
Copy Markdown
Member Author

rjzamora commented May 3, 2022

Thanks to everyone who provided feedback here! (@jsignell @jrbourbeau @ian-r-rose @bryanwweber @douglasdavis )

I'd like to merge this EOD today (so it sits in main for a bit before the next release), so please do let me know if there are any remaining suggestions/concerns :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dataframe documentation Improve or add to documentation io

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants