Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIP-31] Add lazy way to get Airflow context from a wrapped function #8058

Closed
casassg opened this issue Apr 2, 2020 · 9 comments · Fixed by #9631
Closed

[AIP-31] Add lazy way to get Airflow context from a wrapped function #8058

casassg opened this issue Apr 2, 2020 · 9 comments · Fixed by #9631
Assignees
Labels
AIP-31 Task Flow API for nicer DAG definition kind:feature Feature Requests

Comments

@casassg
Copy link
Contributor

casassg commented Apr 2, 2020

Description

Currently the only way to access the context object from a PythonOperator wrapped function is by setting provide_context=True in the Operator. That loads it into the kwargs. This makes the signature of the function become difficult to argue about.

Adding a way to lazy get the function when executing the function may be better. Similar to Flask flask.request that get populated when the function is executed.

Use case / motivation

Use the Airflow context in arbitrary function while keeping the signature of the function stable and easy to reason about.

An idea of implementation would be:

from airflow.task import context

@task 
def context_task(...)
    return context['run_date']

This can be populated by enabling a context manager on the PythonOperator.execute function that populates the field (similar to how DagContext works) and removes it afterwards.

@casassg casassg added the kind:feature Feature Requests label Apr 2, 2020
@kaxil
Copy link
Member

kaxil commented Apr 2, 2020

Just a comment, provide_context is no longer requires in Master: https://github.com/apache/airflow/pull/5990/files/a5d99ed3f3c7b1a815bd2617d3b935bed499f3f7

@turbaszek turbaszek added the AIP-31 Task Flow API for nicer DAG definition label Apr 2, 2020
@casassg
Copy link
Contributor Author

casassg commented Apr 2, 2020

I see, I still think we should implement this lazy resolved context. Using the current approach from master means that signatures can mismatch op_args/op_kwargs and would make it difficult to reason about the function signature:

@task 
def simple_task(text:str, execution_date: datetime):
  print(text)
  print(execution_date)

simple_task("hello")

As you can see this will execute correctly even though the function signature has a an extra argument. This leads to confusion in my personal opinion hence my proposed solution.

@kaxil
Copy link
Member

kaxil commented Apr 2, 2020

I see, I still think we should implement this lazy resolved context. Using the current approach from master means that signatures can mismatch op_args/op_kwargs and would make it difficult to reason about the function signature:

@task 
def simple_task(text:str, execution_date: datetime):
  print(text)
  print(execution_date)

simple_task("hello")

As you can see this will execute correctly even though the function signature has a an extra argument. This leads to confusion in my personal opinion hence my proposed solution.

Yup agree, just wanted to make it clear that we don't need provide_context anymore

@jonathanshir
Copy link
Contributor

My suggested changes:
The issue states we want to implement context similarly to flask.requests :

from airflow.task import context
@task 
def context_task(...)
    return context['run_date']

I think this is a bit of an over-complication - our use case is much simpler than this.
We simply have a single execute context for each task instance, so my intended implementation will be a bit simpler:

from airflow.current import current_context
@task
def context_task(...)
    context = current_context()
    return context['run_date']

The actual implementation will contain a simple stack data type that is updated before op.execute is called.
Thoughts?

@turbaszek
Copy link
Member

We simply have a single execute context for each task instance, so my intended implementation will be a bit simpler:

from airflow.current import current_context
@task
def context_task(...)
    context = current_context()
    return context['run_date']

+1 for starting simple, what would you say to make this context a namedtuple? Referencing values is much more easier with namedtuple than with dictionary (no need to remember names).

@jonathanshir
Copy link
Contributor

jonathanshir commented Apr 26, 2020

Sounds great, but does this imply we also change the context being provided the old way?

@provide_context
def task(**context):
    print(type(context)) # == NamedTuple?

@turbaszek
Copy link
Member

turbaszek commented Apr 26, 2020

Sounds great, but does this imply we also change the context being provided the old way?

No, if someone uses **context then it implicitly suggest that it will be a dict. I would do this only for the "new" context. Also, we can alter namedtuple __getitem__ to support old (dict like) behaviour.

@casassg
Copy link
Contributor Author

casassg commented Apr 26, 2020

Saw it on the PR, lgtm as well on making it a function 😄

@casassg
Copy link
Contributor Author

casassg commented Jun 10, 2020

Any updates on this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-31 Task Flow API for nicer DAG definition kind:feature Feature Requests
Projects
None yet
4 participants