Skip to content

Give up on trying to recreate task_id logic#22794

Merged
uranusjr merged 2 commits intoapache:mainfrom
astronomer:mapped-task-unmap-task-id
Apr 7, 2022
Merged

Give up on trying to recreate task_id logic#22794
uranusjr merged 2 commits intoapache:mainfrom
astronomer:mapped-task-unmap-task-id

Conversation

@uranusjr
Copy link
Copy Markdown
Member

@uranusjr uranusjr commented Apr 6, 2022

BaseOperator does way too much to modify task_id, let's give up on trying to recreate it and instead just forcefully overwrite.

Inspired by @ashb’s hack

@ashb
Copy link
Copy Markdown
Member

ashb commented Apr 6, 2022

A test for you

diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 38be50b12..5311cb6ed 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -23,6 +23,7 @@ import pathlib
 import signal
 import sys
 import urllib
+from contextlib import suppress
 from tempfile import NamedTemporaryFile
 from traceback import format_exception
 from typing import List, Optional, Union, cast
@@ -55,6 +56,7 @@ from airflow.models import (
     Variable,
     XCom,
 )
+from airflow.models.taskfail import TaskFail
 from airflow.models.taskinstance import TaskInstance, load_error_file, set_error_file
 from airflow.models.taskmap import TaskMap
 from airflow.models.xcom import XCOM_RETURN_KEY
@@ -1358,6 +1360,39 @@ class TestTaskInstance:
         assert 'template: test_email_alert_with_config' == title
         assert 'template: test_email_alert_with_config' == body
 
+    @patch('airflow.models.taskinstance.send_email')
+    def test_failure_mapped_taskflow(self, mock_send_email, dag_maker, session):
+        with dag_maker(dag_id='test_failure_email', session=session) as dag:
+
+            @dag.task(email='to')
+            def test_email_alert(x):
+                raise RuntimeError("Fail please")
+
+            test_email_alert.expand(x=[1, 2, 3])
+        ti = sorted(
+            dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances,
+            key=lambda ti: ti.map_index,
+        )[0]
+        assert ti.map_index == 0
+
+        with suppress(RuntimeError):
+            ti.run(session=session)
+
+        (email, title, body), _ = mock_send_email.call_args
+        assert email == 'to'
+        assert 'test_email_alert' in title
+        assert 'test_email_alert__1' not in title
+        assert 'map_index=0' in title
+        assert 'test_email_alert' in body
+        assert 'Try 1' in body
+
+        tf = (
+            session.query(TaskFail)
+            .filter_by(dag_id=ti.dag_id, task_id=ti.task_id, run_id=ti.run_id, map_index=ti.map_index)
+            .one()
+        )
+        assert tf, "TaskFail was recorded"
+
     def test_set_duration(self):
         task = DummyOperator(task_id='op', email='test@test.test')
         ti = TI(task=task)

@uranusjr
Copy link
Copy Markdown
Member Author

uranusjr commented Apr 7, 2022

Is there another PR adding the test or should I do it here?

@uranusjr uranusjr force-pushed the mapped-task-unmap-task-id branch from 819e9aa to da3540c Compare April 7, 2022 06:55
@ashb
Copy link
Copy Markdown
Member

ashb commented Apr 7, 2022

Probably here for that test please.

uranusjr added 2 commits April 7, 2022 19:36
BaseOperator does way too much to modify task_id, let's give up on
trying to recreate it and instead just forcefully overwrite.
@uranusjr uranusjr force-pushed the mapped-task-unmap-task-id branch from da3540c to 3d5c33d Compare April 7, 2022 12:32
@uranusjr uranusjr marked this pull request as ready for review April 7, 2022 12:32
@uranusjr uranusjr requested review from XD-DENG, ashb and kaxil as code owners April 7, 2022 12:32
@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Apr 7, 2022
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 7, 2022

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@uranusjr uranusjr merged commit 1d141a2 into apache:main Apr 7, 2022
@uranusjr uranusjr deleted the mapped-task-unmap-task-id branch April 7, 2022 13:14
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Apr 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:dynamic-task-mapping AIP-42 area:serialization changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) full tests needed We need to run full set of tests for this PR to merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants