Skip to content

Conversation

@easontm
Copy link
Contributor

@easontm easontm commented Dec 8, 2021

I discovered that task_instance_mutation_hook is only invoked in dagrun.verify_integrity(), which itself is not invoked often due to its cost. The goal of this change is to ensure tasks are mutated on every execution, regardless of the source of the run.

I am unsure how to formally test this change, though I did verify it behaved as expected locally by clicking the UI and checking webserver logs.

@boring-cyborg boring-cyborg bot added the area:webserver Webserver related Issues label Dec 8, 2021
@ashb
Copy link
Member

ashb commented Dec 8, 2021

I'm wary of running this in the webserver code -- does it need to be there, or could it be run on the worker just before the task actually starts?

/cc @potiuk Another consideration for multi-tenancy.

@potiuk potiuk added the multi-team - aip-67 Issues related to multi-team (AIP-67) label Dec 8, 2021
@potiuk
Copy link
Member

potiuk commented Dec 8, 2021

Yep. Agree with @ashb . It should not be possible to run the mutation hook in here.

First of all, what would be the purpose of it ?

The reason is not cost. If you look closely the task_instance_mutation hook should definitely not be called here:

https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html

| Takes a TaskInstance parameter called task_instance. Called right before task execution.

@potiuk potiuk closed this Dec 8, 2021
@potiuk potiuk added invalid and removed multi-team - aip-67 Issues related to multi-team (AIP-67) labels Dec 8, 2021
@easontm
Copy link
Contributor Author

easontm commented Dec 8, 2021

hi @potiuk , thanks for the quick review. Sorry, I misspoke. "Cost" should have been "speed" and was in reference to verify_integrity.

Anyway, what I'm trying to address --

First of all, what would be the purpose of it ?

I'm trying to make this statement true

Called right before task execution.

because it appears to not be so.


Ex: the demo function from the Cluster policy docs shows a queue mutation

def task_instance_mutation_hook(task_instance: TaskInstance):
    if task_instance.try_number >= 1:
        task_instance.queue = 'retry_queue'

An easy way for me to validate that queue mutation works is to use my CeleryKubernetesExecutor deployment, and make the TI mutation send the task to the Kube queue.

What happens: If I trigger a new DAGrun, I get print statements from my pod mutation hook (so it did successfully mutate the task and send it to Kube) and the task does not appear in the Celery worker logs. However, if I then clear the task from the UI and let the scheduler automatically add it again, it goes to the Celery worker.

Task log 1:

*** Reading local file: /usr/local/airflow/logs/foo/bar/2021-12-07T01:00:00+00:00/1.log
[2021-12-08, 14:13:28 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: foo.bar manual__2021-12-07T01:00:00+00:00 [queued]>
[2021-12-08, 14:13:28 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: foo.bar manual__2021-12-07T01:00:00+00:00 [queued]>
[2021-12-08, 14:13:28 UTC] {taskinstance.py:1241} INFO - 
--------------------------------------------------------------------------------
[2021-12-08, 14:13:28 UTC] {taskinstance.py:1242} INFO - Starting attempt 1 of 1
[2021-12-08, 14:13:28 UTC] {taskinstance.py:1243} INFO - 
--------------------------------------------------------------------------------
[2021-12-08, 14:13:28 UTC] {taskinstance.py:1262} INFO - Executing <Task(PythonOperator): bar> on 2021-12-07 01:00:00+00:00
[2021-12-08, 14:13:28 UTC] {standard_task_runner.py:52} INFO - Started process 41 to run task
[2021-12-08, 14:13:28 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'foo', 'bar', 'manual__2021-12-07T01:00:00+00:00', '--job-id', '2572809', '--raw', '--subdir', 'DAGS_FOLDER/foo/foo.py', '--cfg-path', '/tmp/tmpbk0cglnj', '--error-file', '/tmp/tmpcua95x7x']
[2021-12-08, 14:13:28 UTC] {standard_task_runner.py:77} INFO - Job 2572809: Subtask bar
[2021-12-08, 14:13:28 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: foo.bar manual__2021-12-07T01:00:00+00:00 [running]> on host foobar.7314175dfa62
[2021-12-08, 14:13:28 UTC] {logging_mixin.py:109} INFO - CUSTOM_HOOK - Mutating pod for task foo.bar.2021-12-07T01_00_00_plus_00_00
[2021-12-08, 14:13:28 UTC] {taskinstance.py:1429} INFO - Exporting the following env vars:

Task log 2:

*** Reading local file: /usr/local/airflow/logs/foo/bar/2021-12-07T01:00:00+00:00/2.log
[2021-12-08, 14:16:59 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: foo.bar manual__2021-12-07T01:00:00+00:00 [queued]>
[2021-12-08, 14:16:59 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: foo.bar manual__2021-12-07T01:00:00+00:00 [queued]>
[2021-12-08, 14:16:59 UTC] {taskinstance.py:1241} INFO - 
--------------------------------------------------------------------------------
[2021-12-08, 14:16:59 UTC] {taskinstance.py:1242} INFO - Starting attempt 2 of 2
[2021-12-08, 14:16:59 UTC] {taskinstance.py:1243} INFO - 
--------------------------------------------------------------------------------
[2021-12-08, 14:16:59 UTC] {taskinstance.py:1262} INFO - Executing <Task(PythonOperator): bar> on 2021-12-07 01:00:00+00:00
[2021-12-08, 14:16:59 UTC] {standard_task_runner.py:52} INFO - Started process 81 to run task
[2021-12-08, 14:16:59 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'foo', 'bar', 'manual__2021-12-07T01:00:00+00:00', '--job-id', '2572817', '--raw', '--subdir', 'DAGS_FOLDER/foo/foo.py', '--cfg-path', '/tmp/tmphi0jnedl', '--error-file', '/tmp/tmp26kgm3k9']
[2021-12-08, 14:16:59 UTC] {standard_task_runner.py:77} INFO - Job 2572817: Subtask bar
[2021-12-08, 14:16:59 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: foo.bar manual__2021-12-07T01:00:00+00:00 [running]> on host airflow-celery-worker-5654798876-4pnfx
[2021-12-08, 14:16:59 UTC] {taskinstance.py:1429} INFO - Exporting the following env vars:

And then the task appears in my Celery worker's logs

[2021-12-08 14:16:58,713: INFO/ForkPoolWorker-16] Executing command in Celery: ['airflow', 'tasks', 'run', 'foo', 'bar', 'manual__2021-12-07T01:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/foo/foo.py']

@potiuk
Copy link
Member

potiuk commented Dec 8, 2021

But that's a totally different issue and changing "www" to execute task mutation has precisely 0 chances of fixing the problem, simply because webserver has nothing to do with task execution.

I suggest you open an issue (not PR) where you will describe your observations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:webserver Webserver related Issues invalid

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants