Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-8058] Add configurable execution context #8651

Closed

Conversation

jonathanshir
Copy link
Contributor

@jonathanshir jonathanshir commented Apr 30, 2020

This PR is a part of [AIP-31], and closes #8058
The goal of this issue is to detach the execution context from the executing method's signature.
Before, the only way to retrieve execution context was via **kwargs in the function's signature.
Now, it is possible to retrieve it using a simple external function call (only works when function is within execute context).
This develops AIP-31 to allow a more functional way of writing code, without being coupled to airflow's current implementation.
Collaborated with: @turbaszek @casassg @evgenyshulman


Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@BasPH
Copy link
Contributor

BasPH commented Apr 30, 2020

I don't really understand it yet. Could you provide a practical example?

Plus, please don't place code in utils unless absolutely necessary. I'm sure we can think of a better named folder :-)

@turbaszek turbaszek added the AIP-31 Task Flow API for nicer DAG definition label May 5, 2020
@jonathanshir
Copy link
Contributor Author

jonathanshir commented May 5, 2020

@BasPH

I don't really understand it yet. Could you provide a practical example?

So let's start with what exists today to give users more visibility and control into airflow's flow:
The easiest way to interject into airflow's compile-time code: "Task Policy".
In airflow.settings.policy you can see that the user can define his own policy to be called for every task object inside every executing DAG. This gives the user the option to alter task objects during compile time.
Keeping this in mind, currently there is no real way to alter airflow variables during execution.
Our use case in databand.ai is basically validating and running checks on every single task before it is executed, without changing every single airflow operator!
@mik-laj
It is important to mention that I think some confusion may have arose from the "bad name" in the configuration:
This feature only allows you to define a contextmanager that is called before a task is executed. It does not alter airflow's context in any way, it only allows the user to do so. We have seen no clear use case for "replacing" airflow's context, so this is only an addition, hence will be renaming the feature to additional_execute_contextmanager instead of user_defined_execute_context

@jonathanshir jonathanshir marked this pull request as ready for review May 5, 2020 16:17
@evgenyshulman
Copy link
Contributor

evgenyshulman commented May 5, 2020

continuing on this discussion: actually you have a really great usecase for this feature - #8432
this feature will enable providing GCP context in the runtime without change every and every operator ( right now you have change Bash and Python, but what about all others? )

so useful example for contextmanager can be something like this ( it can be part of airflow library, or just defined by the user in place accordingly to his requirements)

@contextlib.contextmanager
def gcp_context(task_instance , context):
    if  not ( "some condition on config that provide what gcp project to use ( maybe conf.get('gcp', 'project_id', None)..)"):
        yield None 
        return
    gcp_gcp_delegate_to = ...
    gcp_conn_id = ...
    with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
        try:
            from airflow.providers.google.common.hooks.base_google import GoogleBaseHook

        except ImportError:
            raise AirflowException(
                'Additional packages gcp are not installed. Please install it to use gcp_conn_id '
                'parameter.'
                'For more information, please look at: '
                f'{DOC_BASE_URL}/installation' )
        with GoogleBaseHook(gcp_conn_id=gcp_conn_id, delegate_to=gcp_gcp_delegate_to).provide_authorized_gcloud() as gcp_context :
                    yield gcp_context  # will not be in use
       

( based on #8432 implementation)

airflow/config_templates/config.yml Outdated Show resolved Hide resolved
airflow/models/taskinstance.py Outdated Show resolved Hide resolved
airflow/task/context/current.py Outdated Show resolved Hide resolved
docs/howto/use-additional-execute-contextmanager.rst Outdated Show resolved Hide resolved
docs/howto/use-additional-execute-contextmanager.rst Outdated Show resolved Hide resolved
docs/howto/use-additional-execute-contextmanager.rst Outdated Show resolved Hide resolved
docs/howto/use-additional-execute-contextmanager.rst Outdated Show resolved Hide resolved
tests/task/context/test_current_context.py Outdated Show resolved Hide resolved
@turbaszek
Copy link
Member

@kaxil @mik-laj what are your thoughts?

@mik-laj
Copy link
Member

mik-laj commented May 21, 2020

I still don't understand why the additional execution context is configured as a global option. In my opinion, it is worth each task could have its own separate contexts. In other words, the additional context should be a task parameter, not an option in the global file. This will allow the tasks to have a different context and then use SSH tunnel or gcloud authorization. Such contexts will be able to be parameterized and e.g. one task will be able to use SSH tunnnel to connect to server A, and another task will be able to connect to server B

If someone wants one global context, it will also be possible by defining the cluster policy.
https://airflow.readthedocs.io/en/latest/concepts.html#cluster-policy

I propose that the context be defined at the task level.

The example context with parameters will look like this.

def ssh_tunnel(local_port, remote_port):
	@contextmanager
	def ssh_tunnel_context():
		from sshtunnel import SSHTunnelForwarder

		server = SSHTunnelForwarder(
		    'pahaz.urfuclub.ru',
		    ssh_username="pahaz",
		    ssh_password="secret",
		    remote_bind_address=('127.0.0.1', remote_port),
		    local_bind_address=('127.0.0.1', local_port)
		)
		try:
			server.start()
			yield
		finally:
			server.stop()
	return ssh_tunnel_context

The example task will look like this.

task_a = MySQLExecuteQueryOperator(
	task_id='execute_query',
	execution_contexts=[
		ssh_tunnel(3306, 3307),
	]
)

If you want to define a context that will be used by all operators, you can define the cluster policy as follows.

def my_task_policy(task):
	task.execution_contexts.append(my_global_task_context())

What do you think about my proposal? Will it meet your requirements?

@evgenyshulman
Copy link
Contributor

I still don't understand why the additional execution context is configured as a global option. In my opinion, it is worth each task could have its own separate contexts. In other words, the additional context should be a task parameter, not an option in the global file. This will allow the tasks to have a different context and then use SSH tunnel or gcloud authorization. Such contexts will be able to be parameterized and e.g. one task will be able to use SSH tunnnel to connect to server A, and another task will be able to connect to server B

If someone wants one global context, it will also be possible by defining the cluster policy.
https://airflow.readthedocs.io/en/latest/concepts.html#cluster-policy

I propose that the context be defined at the task level.

The example context with parameters will look like this.

def ssh_tunnel(local_port, remote_port):
	@contextmanager
	def ssh_tunnel_context():
		from sshtunnel import SSHTunnelForwarder

		server = SSHTunnelForwarder(
		    'pahaz.urfuclub.ru',
		    ssh_username="pahaz",
		    ssh_password="secret",
		    remote_bind_address=('127.0.0.1', remote_port),
		    local_bind_address=('127.0.0.1', local_port)
		)
		try:
			server.start()
			yield
		finally:
			server.stop()
	return ssh_tunnel_context

The example task will look like this.

task_a = MySQLExecuteQueryOperator(
	task_id='execute_query',
	execution_contexts=[
		ssh_tunnel(3306, 3307),
	]
)

If you want to define a context that will be used by all operators, you can define the cluster policy as follows.

def my_task_policy(task):
	task.execution_contexts.append(my_global_task_context())

What do you think about my proposal? Will it meet your requirements?

I would say it would be a nice option to be able to provide execution context per operator as well, however I assume that the idea is to be able to define a context that will affect all operators, instead of changing one by one. So sometimes a user will use context per operator (a new feature to implement), sometimes a user can just apply global context, so every operator will get GCP connection context regardless its type (or with some operator type check)

@turbaszek
Copy link
Member

@evgenyshulman I think task-level context + cluster policy can provide the same functionality as the global context manager. In that way more situations would be addressed. WDYT?

@mik-laj
Copy link
Member

mik-laj commented May 26, 2020

@evgenyshulman My solution allows for two use cases - task context, global context) Your proposal allows you to complete only one use case - global context. Is there any reason to keep this context as a global option? Did I miss something? I am not sure which solution is the best and I would like to understand both solutions.

return subprocess.call(args=cmd, stdout=dev_null, stderr=subprocess.STDOUT)

@contextmanager
def provide_gcp_context(task_instance, execution_context):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does execution_context already contain task_instance? It would be nice if the user could expect a common signature for many methods - pre_execute, on_execute, pre_execute, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@evgenyshulman My solution allows for two use cases - task context, global context) Your proposal allows you to complete only one use case - global context. Is there any reason to keep this context as a global option? Did I miss something? I am not sure which solution is the best and I would like to understand both solutions.

I am in favor of keeping the solution simple. The only concern is that current policy implementation requires airflow_local_settings that I don't see in use by most of the airflow cases. also in our use case, we need it on a global level so this is why we have done it this way. Do you see that you'll use it as a policy right after we implement it via policy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pre_execute - good point, we will fix it in the next commit.

@turbaszek turbaszek force-pushed the feature/aip31_current_context branch from 52213d1 to adb7cc4 Compare July 2, 2020 14:30
@turbaszek turbaszek changed the title [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature [AIRFLOW-8058] Add configurable execution context Jul 2, 2020
@turbaszek turbaszek force-pushed the feature/aip31_current_context branch from adb7cc4 to de148b0 Compare July 6, 2020 15:13
@turbaszek
Copy link
Member

Depends on #9631

Support for getting current context at any codelocation that runs
under the scope of BaseOperator.execute function. This functionality
is part of AIP-31
@turbaszek turbaszek force-pushed the feature/aip31_current_context branch from de148b0 to 47b91e0 Compare July 6, 2020 15:15
@stale
Copy link

stale bot commented Aug 22, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Aug 22, 2020
@turbaszek
Copy link
Member

Let's keep it open for a moment

@stale stale bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Aug 24, 2020
@kaxil kaxil added this to the Airflow 2.0.0 milestone Aug 24, 2020
@kaxil kaxil added the pinned Protect from Stalebot auto closing label Aug 24, 2020
@potiuk
Copy link
Member

potiuk commented Dec 7, 2020

Hello - Is this something we really need to release in 2.0.0rc1 ?

If not - can someone set the right milestone please :)?

@kaxil kaxil modified the milestones: Airflow 2.0.0rc1, Airflow 2.1 Dec 7, 2020
@kaxil kaxil modified the milestones: Airflow 2.1, Airflow 2.2 Apr 27, 2021
@sann05
Copy link

sann05 commented May 26, 2021

Does anyone still work on it?

@turbaszek
Copy link
Member

Does anyone still work on it?

I'm afraid not, would you like to pick it up? I think the case for this change was not that strong at that time.

@ashb
Copy link
Member

ashb commented Jun 10, 2021

I'm afraid I'm going to close this as Wont Fix -- it feels very broad and fragile to use this global config setting for the example you've given (GCP project)

This is also not necessary for the original target issue

There is already a on_execute_callback, so perhaps a much simpler approach might be to add a matching on_post_execute_callback (PR) and then you can use https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html?highlight=policy to set this globally in our install.

Sorry to leave this PR mostly silent for most of a year to then close it, but we can build a simpler solution.

@ashb ashb closed this Jun 10, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-31 Task Flow API for nicer DAG definition pinned Protect from Stalebot auto closing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[AIP-31] Add lazy way to get Airflow context from a wrapped function
9 participants