Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: Move rendering of map_index_template so it renders for failed tasks as long as it was defined before the point of failure #38902

Merged
merged 20 commits into from
Apr 15, 2024

Conversation

TJaniF
Copy link
Contributor

@TJaniF TJaniF commented Apr 10, 2024

In Airflow 2.9 map_index_template does not render when the task fails.

image

I moved the rendering into the finally of _execute_callable so it always happens.

TaskFlow:
image

Traditional operator:
image

Also attempted 2 unit tests :)

Apologies if there is already a PR addressing this. I did not see one.

cc: @RNHTTR


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@TJaniF TJaniF marked this pull request as ready for review April 10, 2024 15:20
@eladkal eladkal added this to the Airflow 2.9.1 milestone Apr 10, 2024
@eladkal eladkal added the type:bug-fix Changelog: Bug Fixes label Apr 10, 2024
Copy link
Contributor

@RNHTTR RNHTTR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for cleaning up my mess :)

@RNHTTR RNHTTR requested a review from uranusjr April 10, 2024 21:48
Copy link
Member

@uranusjr uranusjr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to fix tests

@uranusjr
Copy link
Member

This should fix things…

@TJaniF
Copy link
Contributor Author

TJaniF commented Apr 12, 2024

@uranusjr thank you!

@Lee-W Lee-W self-requested a review April 12, 2024 11:19
@pankajkoti pankajkoti requested a review from ashb April 15, 2024 09:14
@Lee-W Lee-W merged commit 456ec48 into apache:main Apr 15, 2024
42 checks passed
utkarsharma2 pushed a commit to astronomer/airflow that referenced this pull request Apr 22, 2024
…ed tasks as long as it was defined before the point of failure (apache#38902)

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
jedcunningham pushed a commit that referenced this pull request Apr 26, 2024
…ed tasks as long as it was defined before the point of failure (#38902)

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
(cherry picked from commit 456ec48)
RodrigoGanancia pushed a commit to RodrigoGanancia/airflow that referenced this pull request May 10, 2024
…ed tasks as long as it was defined before the point of failure (apache#38902)

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
@rightx2
Copy link

rightx2 commented May 28, 2024

I wonder what I've experienced is same type of the problem above:

  1. clear dynnamic mapped tasks in a dag run
  2. all dynamic mapped tasks go into state of running
  3. Force to set dynmaic task box(or dag run) state success (mark state as success)

In this case, all map_index values are integer, instead of given names.

Screenshot 2024-05-28 at 3 53 31 PM

@TJaniF
Copy link
Contributor Author

TJaniF commented May 28, 2024

@rightx2 That is interesting. I just tried that and for me the map index values from the first run "stick", but maybe I misunderstood what you are doing :)

This is what I tried:

2024-05-28_10-24-49 (1)

This is the DAG:

from airflow.decorators import dag, task

@dag(
    start_date=None,
    schedule=None,
    catchup=False,
)
def mapping_test():
    @task(
        map_index_template="{{ custom_map_index }}"
    )
    def add_one(num):
        import time
        
        if num > 10:
            time.sleep(10)
        else:
            time.sleep(2)
        
        from airflow.operators.python import get_current_context

        context = get_current_context()
        context["custom_map_index"] = "Input x=" + str(num)

        return num + 1
    
    add_one.expand(num=[1, 2, 3, 4, 5, 10, 11, 12, 13])


mapping_test()

I'm using Airflow 2.9.1 (Astro Runtime 11.3.0)

@rightx2
Copy link

rightx2 commented May 29, 2024

@TJaniF

I think there was a mistake in my experiment. I believe I can clarify it now.

  1. For whole newly added dynamic tasks:

    • When I start running dyanmic tasks for the first time and stop them immediately (marking them as success or failed, before they finish), all map_index values are integers.
  2. For newly added individual tasks within pre-existing dynamic tasks:

    • Similarly, if I clear the tasks this time and stop them, only the map_index of the newly added individual task is an integer.

@TJaniF
Copy link
Contributor Author

TJaniF commented May 30, 2024

@rightx2 Thanks for the added explanation. I understand what you mean now and can reproduce it, marking as success (or as failed) will lead to an integer index even if the code to define the custom map index has already run. Thanks for flagging this!

@RNHTTR I think it would be nice if it worked for marking tasks the same way as for failed tasks with attempting to render the custom map index template even if interrupted. :)
If you agree I can open an issue and/or take a stab at it, though I can't promise that will happen soonish, am working on some other things rn 😅

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet