Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for iNaturalist load completion before compiling statistics #4104

Merged
merged 1 commit into from Apr 15, 2024

Conversation

AetherUnbound
Copy link
Contributor

Fixes

Fixes #4103 by @AetherUnbound

Description

This has to be the longest time I've spent on such a small change 馃槄

This PR modifies the iNaturalist flow control so that the step which consolidates the statistics from the load data mapped task waits to run until after all tasks have completed. Previous behavior led this task to be marked as upstream_failed as soon as any one of the loading tasks within the mapped task failed. See the issue for more information on that.

The new approach 1) simplifies an XCom access (just for readability) and, more importantly, 2) ensures that the consolidate_load_statistics step waits until all load_transformed_data tasks complete before moving on. I've left the NONE_SKIPPED trigger rule on the post ingestion tasks because I think it is worth having this DAG clean up in case anything fails (rather than letting an upstream_failed cascade to them too if something goes wrong with consolidation). However, we only want it cleaning up after it's actually done running!!

The upshot of this is that the all tasks which depend on the task group will now read the task group as having passed, since the direct parent (consolidate_load_statistics) will likely always succeed. Again, I think this is okay given the post ingestion tasks are cleanup & reporting. But it's a useful point to convey to @WordPress/openverse-catalog that the last task in a task group will be what any task downstream of the group uses to determine its parent state. I played around with using an EmptyOperator alongside the final task to try and determine the state, and that helped, so it's an option to look into if it's important we both wait and fail, for instance. But this feels like an Airflow "gotcha" that I wanted to make sure folks saw!

Testing Instructions

The easiest way to play around with this actually is to add the following file to your DAGs folder and play around with different executions:

from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
from time import sleep

from airflow.decorators import task
from airflow.models.dag import DAG


def add_one_func(x: int):
    sleep(x * 2)
    if x == 2:
        raise ValueError("x is 2!")
    return x + 1


def sum_it_func(values):
    total = sum(values)
    print(f"Total was {total}")
    raise ValueError("whoops!")


with DAG(
    dag_id="dynamic_task_mapping_check",
    start_date=datetime(2022, 3, 4),
    catchup=False,
) as dag:

    with TaskGroup("big_one") as big_one:

        add_one = PythonOperator.partial(
            task_id="add_one",
            python_callable=add_one_func,
        )

        added_values = add_one.expand(op_args=[[1], [2], [3], [4], [5]])
        sum_it = PythonOperator(
            task_id="sum_it",
            python_callable=sum_it_func,
            op_kwargs={"values": added_values.output},
            trigger_rule=TriggerRule.NONE_SKIPPED,
        )

    @task
    def run_only_on_all_success():
        print(f"Ran at {datetime.now()}")

    @task
    def should_fail():
        print("This should be failed upstream")

    with TaskGroup("add_one_group") as add_one_group:

        @task(trigger_rule=TriggerRule.NONE_SKIPPED)
        def run_only_on_all_success_no_input():
            print(f"Ran at {datetime.now()}")

        run_only_on_all_success_no_input()

    big_one >> run_only_on_all_success()
    big_one >> add_one_group
    big_one >> should_fail()

This is what I used to test different assumptions about the trigger rules, how they operated within the task group, etc.

You could also run the iNaturalist DAG locally and cause an error to occur in one of the load mapped tasks, to observe the change.

Checklist

  • My pull request has a descriptive title (not a vague title likeUpdate index.md).
  • My pull request targets the default branch of the repository (main) or a parent feature branch.
  • My commit messages follow best practices.
  • My code follows the established code style of the repository.
  • I added or updated tests for the changes I made (if applicable).
  • I added or updated documentation (if applicable).
  • I tried running the project locally and verified that there are no visible errors.
  • I ran the DAG documentation generator (if applicable).

Developer Certificate of Origin

Developer Certificate of Origin
Developer Certificate of Origin
Version 1.1

Copyright (C) 2004, 2006 The Linux Foundation and its contributors.
1 Letterman Drive
Suite D4700
San Francisco, CA, 94129

Everyone is permitted to copy and distribute verbatim copies of this
license document, but changing it is not allowed.


Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.

@AetherUnbound AetherUnbound requested a review from a team as a code owner April 12, 2024 20:46
@github-actions github-actions bot added the 馃П stack: catalog Related to the catalog and Airflow DAGs label Apr 12, 2024
@openverse-bot openverse-bot added 馃煥 priority: medium Not blocking but should be addressed soon 馃洜 goal: fix Bug fix 馃捇 aspect: code Concerns the software code in the repository labels Apr 12, 2024
Copy link
Member

@krysal krysal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very usefult description of the problem's root here and in the issue. Seems easy in the end but of course iNaturalist is not a trivial DAG! The change makes sense with that. Thank you for explaining it with a smaller and simpler example 馃挴

Copy link
Contributor

@stacimc stacimc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very interesting problem (and explanation)! LGTM

@AetherUnbound AetherUnbound merged commit 4051522 into main Apr 15, 2024
56 of 60 checks passed
@AetherUnbound AetherUnbound deleted the fix/inaturalist-mapped-task-failure branch April 15, 2024 23:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
馃捇 aspect: code Concerns the software code in the repository 馃洜 goal: fix Bug fix 馃煥 priority: medium Not blocking but should be addressed soon 馃П stack: catalog Related to the catalog and Airflow DAGs
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Investigate recent iNaturalist failures
4 participants