Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create the new data refresh DAG factory and move initial steps into Airflow #4146

Closed
Tracked by #3925
stacimc opened this issue Apr 17, 2024 · 0 comments · Fixed by #4259
Closed
Tracked by #3925

Create the new data refresh DAG factory and move initial steps into Airflow #4146

stacimc opened this issue Apr 17, 2024 · 0 comments · Fixed by #4259
Assignees
Labels
💻 aspect: code Concerns the software code in the repository ✨ goal: improvement Improvement to an existing user-facing feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs

Comments

@stacimc
Copy link
Contributor

stacimc commented Apr 17, 2024

Problem

This issue tracks the creation of a new data refresh DAG factory to generate the new data refresh DAGs that will not rely on the ingestion server, and moving the initial steps (described below) into the DAGs. At the end of this step the DAGs will not be functional/able to run a full refresh.

Description

We’ll create a new data refresh DAG factory to generate data refresh DAGs for each existing media_type and environment. Currently these four will be generated:

  • staging_audio_data_refresh
  • staging_image_data_refresh
  • production_audio_data_refresh
  • production_image_data_refresh

Because the environment is added as a prefix, there will be no collision with the existing DAG ids. In this initial step, we we will add only a small portion of the logic in order to make the PR easier to review. The first steps are already implemented in the current data refresh and can simply be copied:

  • Get the current record count from the target API table; this must be modified to take the environment as an argument
  • Perform concurrency checks on the other data refreshes and conflicting DAGs; this must be modified to include the now larger list of data refresh DAG ids
  • Get the name of the Elasticsearch index currently mapped to the target_alias
  • Generate the new index suffix

We will include new tasks to perform the initial few steps of the ingestion server’s work:

  • Copy Data: this should be a TaskGroup that will have multiple tasks for creating the FDW from the upstream DB to the downstream DB, running the copy_data query, and so on. It should fully replace the implementation of refresh_api_table in the ingestion server. All steps in this section are SQL queries that can be implemented using the existing PostgresHook and PGExecuteQueryOperator.
  • Create Index: we can use our existing Elasticsearch tasks to create the new elasticsearch index with the index suffix generated in the previous task.

Additional context

See this section of the IP

@stacimc stacimc added 🟨 priority: medium Not blocking but should be addressed soon ✨ goal: improvement Improvement to an existing user-facing feature 💻 aspect: code Concerns the software code in the repository 🧱 stack: catalog Related to the catalog and Airflow DAGs labels Apr 17, 2024
@stacimc stacimc self-assigned this Apr 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
💻 aspect: code Concerns the software code in the repository ✨ goal: improvement Improvement to an existing user-facing feature 🟨 priority: medium Not blocking but should be addressed soon 🧱 stack: catalog Related to the catalog and Airflow DAGs
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

1 participant