Skip to content

Implemented logic to add extra arguments to apply#3256

Merged
jcrist merged 11 commits intodask:masterfrom
gabrielelanaro:issue-3234
Mar 18, 2018
Merged

Implemented logic to add extra arguments to apply#3256
jcrist merged 11 commits intodask:masterfrom
gabrielelanaro:issue-3234

Conversation

@gabrielelanaro
Copy link
Copy Markdown
Contributor

@gabrielelanaro gabrielelanaro commented Mar 8, 2018

Note this change could break existing code because meta becomes just an
extra keyword with no positional meaning.

  • Tests added / passed
  • Passes flake8 dask
  • Fully documented, including docs/source/changelog.rst for all changes
    and one of the docs/source/*-api.rst files for new API

Note this change could break existing code because meta becomes just an
extra keyword
Copy link
Copy Markdown
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty good to me. I left a couple of minor comments. Can I also ask you to add a note in the changelog at docs/source/changelog.rst ? If this is your first time then you will also have to add your name and a link at the bottom of that document.

return df.assign(b=df.b - df.b.mean() + c * d)

assert_eq(df.groupby('a').apply(func, 1),
ddf.groupby('a').apply(func, 1))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also test the keyword argument d= in another line?

ddf = dd.from_pandas(df, npartitions=3)

pytest.raises(Exception, lambda: df.groupby('does_not_exist'))
pytest.raises(Exception, lambda: df.groupby('a').does_not_exist)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these raise particular kinds of exceptions, like AttributeError ?


@insert_meta_param_description(pad=12)
def apply(self, func, meta=no_default):
def apply(self, func, *args, **kwargs):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reasonable way to check that args[0] is in fact a meta and give deprecation warning? In py3 you can have positionally default argument, that would work too to avoid API breakage.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Precisely, this really bugs me.

But also with positially default arguments, I can totally see people do it in this way:

df.groupby().apply(f, arg1, arg2)

And fail miserably.

Another way is to make meta a required argument.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with having things fail here, people really should be using keyword arguments by name instead of by position.

@gabrielelanaro
Copy link
Copy Markdown
Contributor Author

@mrocklin thanks for reviewing.

There's also an extra issue that bugs me.

Let's say we pass a Future in args or kwargs, because you have a large argument I did have some issues especially with meta estimation. I will add a test-case to illustrate the issue and maybe we can find a way.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Mar 8, 2018

Let's say we pass a Future in args or kwargs, because you have a large argument I did have some issues especially with meta estimation. I will add a test-case to illustrate the issue and maybe we can find a way.

Yes, I agree both that that is important, and that it may be challenging. There are probably two challenges here:

  1. Getting the task graph of a delayed or dd.Scalar object, and merging its task graph into the computation. This is straightforward, and has been done many other places in the codebase, but might take a bit of time if it's your first time (although you seem to be handling everything here very well :))
  2. Getting metadata. In principle you would have to run the computation, which probably isn't feasible as a default. I think that in this case we probably need to force the user to provide meta=

@gabrielelanaro
Copy link
Copy Markdown
Contributor Author

gabrielelanaro commented Mar 8, 2018

@mrocklin I will make the test pass, for now I'll skip the metadata issue.

c = 1
d = 2
c_scalar = dd.core.Scalar({'my-scalar': c}, 'my-scalar', int)
d_scalar = dd.core.Scalar({'my-scalar': d}, 'my-scalar', int)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than use internal API I recommend that we us something like df.a.sum(). This will make this test less brittle to internal changes and make it easier for novice maintainers to understand.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Mar 8, 2018

Nice test!

d = 2

c_scalar = _make_scalar(c)
d_scalar = _make_scalar(d)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest the following instead:

c_scalar = ddf.a.sum()
d_scalar = ddf.b.mean()

@gabrielelanaro
Copy link
Copy Markdown
Contributor Author

gabrielelanaro commented Mar 10, 2018

I'm having some issues dealing with delayed kwargs. I believe these don't get replaced in apply_and_enforced, I used a workaround, let me know if it looks OK.

@mrocklin
Copy link
Copy Markdown
Member

Have you taken a look at dask.delayed.to_task_dask?

def to_task_dask(expr):
    """Normalize a python object and merge all sub-graphs.

    - Replace ``Delayed`` with their keys
    - Convert literals to things the schedulers can handle
    - Extract dask graphs from all enclosed values

    Parameters
    ----------
    expr : object
        The object to be normalized. This function knows how to handle
        ``Delayed``s, as well as most builtin python types.

    Returns
    -------
    task : normalized task to be run
    dask : a merged dask graph that forms the dag for this task

    Examples
    --------
    >>> a = delayed(1, 'a')
    >>> b = delayed(2, 'b')
    >>> task, dask = to_task_dask([a, b, 3])
    >>> task  # doctest: +SKIP
    ['a', 'b', 3]
    >>> dict(dask)  # doctest: +SKIP
    {'a': 1, 'b': 2}

    >>> task, dasks = to_task_dask({a: 1, b: 2})
    >>> task  # doctest: +SKIP
    (dict, [['a', 1], ['b', 2]])
    >>> dict(dask)  # doctest: +SKIP
    {'a': 1, 'b': 2}

@gabrielelanaro
Copy link
Copy Markdown
Contributor Author

gabrielelanaro commented Mar 11, 2018

@mrocklin It's possible to replace the kwargs handling with a task using to_dask_task. There's however another test that conflicts with both strategies.

Is there any specific reason for the test or I can edit?

    def test_map_partitions_keeps_kwargs_in_dict():
        df = pd.DataFrame({'x': [1, 2, 3, 4], 'y': [5, 6, 7, 8]})
        a = dd.from_pandas(df, npartitions=2)
    
        def f(s, x=1):
            return s + x
    
        b = a.x.map_partitions(f, x=5)
    
>       assert "'x': 5" in str(b.dask)

@mrocklin
Copy link
Copy Markdown
Member

It seems perfectly reasonable to change that test. My guess is that the intent behind that test is that we maintain keyword arguments in the task graph directly in some easy-to-interpret way, rather than including them as closures or something else more exotic.

@gabrielelanaro gabrielelanaro changed the title WIP: Implemented logic to add extra arguments to apply Implemented logic to add extra arguments to apply Mar 11, 2018
@mrocklin
Copy link
Copy Markdown
Member

This looks good to me, but @jcrist might have a better eye for issues here. Lets give him 24 hours to see if he has time to respond (he may be out of contact for the next few days).

@gabrielelanaro
Copy link
Copy Markdown
Contributor Author

@mrocklin @jcrist any update on this PR?

Copy link
Copy Markdown
Member

@jcrist jcrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the delayed review, overall this looks good to me.

func : function
Function applied to each partition.
args, kwargs :
args, kwargs : Scalar, Delayed or object
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

args and kwargs can contain these types, but aren't these types themselves. I'd remove the type note here, and move it to the description below. Something like Arguments and keywords may contain ``Scalar``, ``Delayed``, regular python objects.


Ensures the output has the same columns, even if empty."""
df = func(*args, **kwargs)
df = func(*args, **dict(kwargs))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? I don't think this should be necessary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was something I forgot to clean up, I'll remove it

name = '{0}-{1}'.format(name, token)

from .multi import _maybe_align_partitions
args, args_dasks = _process_lazy_args(args)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be moved down to where the call to to_task_dask is with no issues (since alignment also ignores Delayed objects). I'd prefer to group those together if possible.


@insert_meta_param_description(pad=12)
def apply(self, func, meta=no_default):
def apply(self, func, *args, **kwargs):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with having things fail here, people really should be using keyword arguments by name instead of by position.

@jcrist
Copy link
Copy Markdown
Member

jcrist commented Mar 18, 2018

LGTM. Thanks @gabrielelanaro! Merging.

@jcrist jcrist merged commit 9e5267f into dask:master Mar 18, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants