Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[task scheduling] avoid relative module path problem with subcription keys #12227

Merged
merged 10 commits into from Mar 12, 2024

Conversation

zzstoatzz
Copy link
Contributor

@zzstoatzz zzstoatzz commented Mar 9, 2024

previously (as discussed here) we relied on the task_key as set by Task.__init__ to match task runs on TaskRuns to those a TaskServer was given at start-up, but this had a sneaky problem

say you setup your tasks and serve them:

from prefect import task
from prefect.task_server import serve

@task
def thing():
    print('hi')

if __name__ == '__main__':
    serve(thing)

Here: task_key == '__main__.thing'

but if you import and then submit that task somewhere else:

from myapp.tasks import thing

# ...

await thing.submit()

you end up with thing.task_key == 'myapp.tasks.thing'

so the TaskServer would never pick these up


This PR makes the subscription keys a hash of the task's name and the absolute path of where the task was defined, to ensure we don't have key mismatches related to relative module paths, without requiring a somewhat unintuitive thing like from . import tasks

Copy link

netlify bot commented Mar 9, 2024

Deploy Preview for prefect-docs-preview ready!

Name Link
🔨 Latest commit d156203
🔍 Latest deploy log https://app.netlify.com/sites/prefect-docs-preview/deploys/65f067ac7a95f90008693480
😎 Deploy Preview https://deploy-preview-12227--prefect-docs-preview.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@@ -2966,6 +2966,15 @@ async def create_autonomous_task_run(task: Task, parameters: Dict[str, Any]) ->
factory = await ResultFactory.from_autonomous_task(task, client=client)
await factory.store_parameters(parameters_id, parameters)

if not task.task_origin_hash:
Copy link
Contributor Author

@zzstoatzz zzstoatzz Mar 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only way we should get here is if we cannot get the source file of the function directly - I'm not actually sure in what real world case that would be (i can make it happen by passing something like math.sqrt), but trying indiscriminately to get the source file we failed some of the test_tasks.py cases when a task is instantiated from a callable object like

class A:
    def __call__(self, *_args: Any, **_kwargs: Any) -> Any:
        return "hello"
        
task_instance = Task(fn=A)

even though I seem to have no problem submitting runs of this task instance 🤔

should be an extreme edge case but just flagging in case anyone has thoughts

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. @zzstoatzz Are you saying that the example of a callable object doesn't work?

Copy link
Contributor Author

@zzstoatzz zzstoatzz Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it actually does!

even though I seem to have no problem submitting runs of this task instance 🤔

but I'm saying without the try/except in Task.__init__ which handles failures by setting this hash to None, we fail some of the test_tasks.py tests where we are testing things about tasks instantiated in this way

I tried to reproduce the failure I saw in the test by creating a task from this callable class A and then submitting it, but it actually worked. So it wasn't the most satisfying resolution but wanted to flag in case someone had a better idea of what was happening

@zzstoatzz zzstoatzz marked this pull request as ready for review March 11, 2024 13:42
@zzstoatzz zzstoatzz requested a review from a team as a code owner March 11, 2024 13:42
@zzstoatzz zzstoatzz self-assigned this Mar 11, 2024
" `TaskServer` instance is capable of running the task."
)

task.task_key = task.task_origin_hash
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, won't this obscure the task key for all functions? I thought we only wanted to do this for ones that fell into the __main__ trap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm yeah it does, updated in 7507ba3

Copy link
Collaborator

@chrisguidry chrisguidry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dig it!

src/prefect/task_server.py Outdated Show resolved Hide resolved
@zzstoatzz zzstoatzz merged commit 71721ce into main Mar 12, 2024
45 checks passed
@zzstoatzz zzstoatzz deleted the task-scheduling-avoid-task-key-trap branch March 12, 2024 15:01
@discdiver
Copy link
Contributor

With 2.16.4 the import is still required in the example below.

task_server.py:

from prefect import task
from prefect.task_server import serve


@task
def my_b_task(name: str):
    print(f"Hello, {name}!")
    return f"Hello, {name}!"


if __name__ == "__main__":
    from task_server import my_b_task  # this is required for the task to run 

    serve(my_b_task)

task_submitter.py

from task_server import my_b_task
import asyncio

if __name__ == "__main__":
    val = my_b_task.submit("Agrajag")
    print(val)

However, with a FastAPI server it is not if you just hit the API endpoint

first_fastapi.py

from fastapi import FastAPI
from prefect import task
from ff_prefect_task_server import my_fastapi_task

app = FastAPI()


@task
def my_b_task(name: str):
    print(f"Hello, {name}!")
    return f"Hello, {name}!"


@app.get("/ptask")
async def prefect_task():
    val = my_fastapi_task.submit(name="Marvin")
    return {"message": f"Prefect Task submitted: {val}"}

ff_prefect_task_server.py

from prefect import task
from prefect.task_server import serve


@task
def my_fastapi_task(name: str):
    print(f"Hello, {name}!")


if __name__ == "__main__":
    serve(my_fastapi_task)

@zzstoatzz
Copy link
Contributor Author

hey @discdiver - thanks, I'll look at this today

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants