Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New conditional api #2381

Closed
jcrist opened this issue Apr 21, 2020 · 4 comments · Fixed by #2443
Closed

New conditional api #2381

jcrist opened this issue Apr 21, 2020 · 4 comments · Fixed by #2443
Labels
enhancement An improvement of an existing feature

Comments

@jcrist
Copy link

jcrist commented Apr 21, 2020

Background

The current conditional api (switch, ifelse, and merge) work, but have a few problems. Using ifelse as an example:

ifelse(cond : Task, true_task : Task, false_task : Task) -> None
  • If cond evaluates to True, true_task is run and false_task is skipped. However, tasks upstream to false_task aren't skipped. For example:

    ifelse(
        some_condition(),
        load(transform(extract())),
        some_other_task(),
    )

    if some_condition evaluates to False, only the load task is skipped, transform and extract still execute.

  • If the result of a task is conditional based on upstream tasks, you'll need to call merge explicitly. We tried making ifelse/switch return the merge implicitly, but had to revert due to issues with implicitly creating terminal tasks in the graph (Control flow tasks return executed branch #2310, Revert "Control flow tasks return executed branch" #2379).

    on_true = do_something()
    on_false = do_something_else()
    cond = condition()
    ifelse(cond, on_true, on_false)
    result = merge(on_true, on_false)

    This approach works, but can be tricky to read.

Proposal

I propose we add a contextmanager api for expressing conditional flows. The existing switch/ifelse/merge api will still exist (and may be used internally by the contextmanager api).

The api consists of two new functions:

  • case: used to express branches in logic
  • var: used to express merging outputs (this could equally be a class Variable)

A few examples to illustrate use:

Example 1

Here we use case to express an ifelse branch:

with Flow("example-1") as flow:
    cond = some_condition()
    
    with case(cond, True):
        a = do_a()
        b = do_b()
        do_c(a, b)

    with case(cond, False):
        do_c(1, 2)

If some_condition is True, do_a, do_b, and do_c are run. If it's false, do_c is run with some constant inputs. For each case block, a Condition task is created. If a task is created inside the case block and has no upstream dependencies that were also created inside that case block, then the condition is set as an upstream dependency. In this case, a and b would have a Condition for True set as an upstream dependency, but do_c would only depend on a and b. We could equally have all tasks created inside the block have the condition as an immediate upstream task, but that makes the graph visually complicated.

Example 2

Here we illustrate merging tasks with var. Lets say we wanted to capture the output of do_c and use it later on. In normal Python code we might write this as:

def example_2():
    if some_condition():
        a = do_a()
        b = do_b()
        c = do_c(a, b)
    else:
        c = do_c(1, 2)

    do_d(c)

Using this proposal, an equivalent flow would be:

with Flow("example-2") as flow:
    cond = some_condition()
    
    c = var("c")

    with case(cond, True):
        a = do_a()
        b = do_b()
        c.set(do_c(a, b))

    with case(cond, False):
        c.set(do_c(1, 2))

    do_d(c)

Each call to c.set binds the value of c inside its corresponding case block. Behind the scenes this builds up a merge task branch-by-branch (note: we may not use merge directly, but the general structure will be the same). Since c is a Task itself, it can then be fed to downstream tasks.

Example 3

These context managers should work fine even with nested conditional logic. For example, the following function:

def example_3():
    if some_condition():
        if other_condition():
            a = do_a()
        else:
            a = 1
        b = do_b()
        do_c(a, b)
    else:
        d = do_d()
        do_e(d)

corresponds to an equivalent flow of:

with Flow() as flow:
    cond = some_condition()

    with case(cond, True):

        cond2 = other_condition()

        a = var("a")
        with case(cond2, True):
            a.set(do_a())
        with case(cond2, False):
            a.set(1)

        b = do_b()
        do_c(a, b)

    with case(cond, False):
        d = do_d()
        do_e(d)

Note that in this case, the Condition task created by calling case on cond2 will have the Condition created for cond1 as a direct upstream task (following the logic described above, since it has no upstream tasks created inside this case context, the corresponding condition is set as a direct upstream task).


I think implementing the api as described above is fairly doable. It might require #2298 though to properly handle complex branching/merging logic (not sure yet).

@jcrist jcrist added the enhancement An improvement of an existing feature label Apr 21, 2020
@joshmeek
Copy link

Should this be more formalized as a PIN? Doesn't have to be very long and this seems like really cool functionality 😄

@jcrist
Copy link
Author

jcrist commented Apr 22, 2020

Maybe? I'd be happy to write one up if that's the route we should go.

@lauralorenz
Copy link

@jcrist just for posterity from talking in core release planning, the temperature of the group was to carry on with implementation without a dedicated PIN given that the new API is additive and does not change the existing API. If implementation does tread into breaking change territory for iflese/switch/merge, for example if there is any logic shared between the two APIs in the pipeline for branching that must be changed to add the new API, it sounded like we were of the opinion we should pursue a PIN in that case and futhermore determine a release schedule in core planning

@jcrist jcrist mentioned this issue Apr 28, 2020
3 tasks
@jcrist
Copy link
Author

jcrist commented Apr 28, 2020

I put a quick initial draft up in #2443.

abrookins pushed a commit that referenced this issue Jul 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants