Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Databricks task instances cannot be mapped #4958

Closed
baldwint opened this issue Sep 11, 2021 · 0 comments
Closed

Databricks task instances cannot be mapped #4958

baldwint opened this issue Sep 11, 2021 · 0 comments

Comments

@baldwint
Copy link

Description

The Databricks task classes are not thread safe and therefore cannot be mapped over when using a non-sequential executor like LocalDaskExecutor. This is because they store the run id for the Databricks job they are polling as an attribute on the task instance. If I call run() on the task instance before the prior one has finished, the attribute will be overwritten and the concurrently running tasks will return the wrong result.

Expected Behavior

When mapping N-fold over a DatabricksSubmitRun or DatabricksRunNow task, I expect to get N distinct Databricks job run IDs in the result. Instead I get N copies of the same job run ID (whichever one started last).

Reproduction

This simplified version of the code illustrates the problem:

from random import random
from time import sleep

import prefect
from prefect.executors import LocalDaskExecutor


class MyTask(prefect.Task):
    def run(self, person: str = None) -> str:

        # submit to API and get a run id
        self.run_id = person

        # poll until job is done
        sleep(random())

        # return run id, but since it was stored as an attribute,
        # other threads may have changed it while polling!
        return self.run_id


my_task = MyTask()


@prefect.task
def reducer(result):
    return sorted(result)


with prefect.Flow("my_flow", executor=LocalDaskExecutor()) as flow:
    mapped_result = my_task.map(person=["arthur", "ford", "marvin"])
    reduced_result = reducer(mapped_result)

state = flow.run()

assert state.result[reduced_result].result == sorted(["arthur", "ford", "marvin"])

This will fail because the result is ["marvin", "marvin", "marvin"] or similar.

MyTask is just a mockup, but both DatabricksSubmitRun and DatabricksRunNow have this same structure (mutating task instance attributes in the run method).

Workaround

This led to some unexpected results in our application. We have worked around either by discontinuing our use of LocalDaskExecutor, or by making sure to instantiate the Databricks task class instances in a scope where they cannot be shared (e.g., inside of a FunctionTask).

I think the long-term fix would be to use a local variable for run_id instead of assigning to attributes of self in the run method of these task classes.

anna-geller added a commit to anna-geller/prefect that referenced this issue Oct 6, 2021
to make it work with mapped tasks. Details in the issue: PrefectHQ#4958
zanieb pushed a commit that referenced this issue Oct 8, 2021
* Prevent run_id to be modified outside of run() method

to make it work with mapped tasks. Details in the issue: #4958

* Create issue4958.yaml

* Update test_databricks.py
@zanieb zanieb closed this as completed Oct 8, 2021
@zanieb zanieb mentioned this issue Oct 21, 2021
lance0805 pushed a commit to hyl2015/prefect that referenced this issue Aug 2, 2022
* Prevent run_id to be modified outside of run() method

to make it work with mapped tasks. Details in the issue: PrefectHQ#4958

* Create issue4958.yaml

* Update test_databricks.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants