Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve support for "concurrency pools" in Elasticsearch DAGs #3891

Closed
stacimc opened this issue Mar 9, 2024 · 5 comments · Fixed by #3921
Closed

Improve support for "concurrency pools" in Elasticsearch DAGs #3891

stacimc opened this issue Mar 9, 2024 · 5 comments · Fixed by #3921
Assignees
Labels
💻 aspect: code Concerns the software code in the repository ✨ goal: improvement Improvement to an existing user-facing feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs

Comments

@stacimc
Copy link
Contributor

stacimc commented Mar 9, 2024

Problem

We have a large number of DAGs that operate on Elasticsearch or the underlying databases. These DAGs must not run concurrently, so we have two reusable tasks (prevent_concurrency_with_dags and wait_for_external_dags) that each accept a list of dag_ids and either (a) immediately fail if any of those dags are running or (b) wait until none of those dags are running, respectively.

The problem is that every time a new elasticsearch DAG is added, all the other DAGs that affect that environment must also be updated to fail fast/wait on it. It is easy to miss dependencies, and indeed a few were missed already:

  • create_new_staging_es_index DAG needs to prevent concurrency with create_proportional_by_source_staging_index
  • staging_database_restore needs to wait on create_proportional_by_source_staging_index

Description

⚠️ Messy description, jotting down some in-progress thoughts. ⚠️

I've prototyped some of this and think this would work:

  • Create two new dag tags used to define "concurrency pools" for staging and production respectively. Something like "staging_elasticsearch_concurrency" and "production_elasticsearch_concurrency". Apply them to all the appropriate DAGs:
    • staging
      • staging_database_restore
      • recreate_full_staging_index
      • point_staging_alias
      • create_new_staging_es_index
      • create_proportional_by_source_staging_index
    • production
      • both data refreshes
      • both filtered index creation dags
      • create_new_production_es_index
      • point_production_index
  • Update the prevent_concurrency_with_dags and wait_for_external_dags task groups to be a single task (rather than dynamically mapping a task for each external dag id). Instead of taking a list of external_dag_ids, it takes the tag to check for
    • It then queries for Dags with that tag (session.query(DagModel).filter(DagModel.tags.any(DagTag.name == tag)).all() in order to get the list of external dags. It excludes the DAG from which the task was called
    • It should also throw an error if the DAG from which the task was called is not itself in the pool. This prevents you from forgetting to add it

(Note: actual implementation ended up being very similar to this idea but not quite)

Alternatives

I prototyped this using tags, but as I write it out it seems obvious that we should literally use Airflow pools instead. I think that would work with the exact same approach.

@stacimc stacimc added 🟨 priority: medium Not blocking but should be addressed soon ✨ goal: improvement Improvement to an existing user-facing feature 💻 aspect: code Concerns the software code in the repository 🧱 stack: catalog Related to the catalog and Airflow DAGs labels Mar 9, 2024
@stacimc stacimc mentioned this issue Mar 11, 2024
16 tasks
@stacimc
Copy link
Contributor Author

stacimc commented Mar 11, 2024

I prototyped this using tags, but as I write it out it seems obvious that we should literally use Airflow pools instead. I think that would work with the exact same approach.

Looking back into this, I don't think using pools is that simple. Technically tasks are assigned to pools, not entire DAGs -- and there is no way (that I have found) to simply filter a list of DAGs with tasks assigned to a particular pool. We could hard code the list, and rely on the same test_dag_parsing test to check that any DAG that uses the list includes itself in the list. An advantage to using the tags is that you can also filter these DAGs in the Airflow UI, though (and you avoid a huge mess of circular imports with the dag ids).


It might also be worth re-stating why we can't just use pools in the normal way, by having a 'staging_elasticsearch' and 'production_elasticsearch' pool with one slot each. Again, tasks run in pools, not entire DAGs. Once a task finishes the slot is freed and the next task that runs may not be from the same DAG.

You can manipulate this with priority weights, but it still does not guarantee what we want. For example, imagine we set "staging_database_restore" as higher priority than "create_new_staging_es_index", but "staging_database_restore" began while "create_new_staging_es_index" was already underway. It would take over the slot instead of waiting as we'd prefer.

@krysal
Copy link
Member

krysal commented Mar 13, 2024

Why does staging_database_restore need to wait on create_proportional_by_source_staging_index?

@stacimc
Copy link
Contributor Author

stacimc commented Mar 13, 2024

Why does staging_database_restore need to wait on create_proportional_by_source_staging_index?

Answered initially thinking of recreate_full_staging_index -- frustrating how hard these are to keep straight 😅 I think it would be safe to include create_proportional_by_source_staging_index as an excluded_dag for the staging_database_restore. I feel less strongly about the reciprocal relationship.

@krysal
Copy link
Member

krysal commented Mar 13, 2024

I'm missing the relation between both DAGs 😅 My reservation is that we could be doing extra work and creating an unnecessary dependency (with risk of more waiting times and/or circular dependencies).

@stacimc
Copy link
Contributor Author

stacimc commented Mar 13, 2024

I'm missing the relation between both DAGs

I think it would be fair to mark those as an exception, which is easy to do with the implementation.

My reservation is that we could be doing extra work

FWIW this was actually really easy to implement once I had the idea for how to do it, and has in fact already been done 😅 I'm just waiting on the point_es_alias DAGs to get approved & merged so I can rebase and do a final pass.

The idea is that rather than maintaining a bunch of different almost-the-same lists of dependencies per each DAG, we only ever need to specify exceptions to the rule. It makes the DAGs themselves cleaner, makes implementing new ones easier, and reduces risk -- based on the idea that it is worse to accidentally exclude a necessary dependency (which could manifest as an error or performance issue in prod, potentially only happening to occur long after the DAGs were introduced), than unintentionally introduce a dependency that wasn't 100% necessary (which just means some time gets wasted waiting).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository ✨ goal: improvement Improvement to an existing user-facing feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs
Projects
Archived in project
2 participants