Skip to content

Commit

Permalink
Add notifications and expand DAG description
Browse files Browse the repository at this point in the history
  • Loading branch information
krysal committed Jul 12, 2024
1 parent 9a251cf commit e8db5c7
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 53 deletions.
80 changes: 69 additions & 11 deletions catalog/dags/database/catalog_cleaner/catalog_cleaner.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
"""
Catalog Data Cleaner DAG
Use TSV files created during the clean step of the ingestion process to bring the
changes into the catalog.
Use the TSV files created during the cleaning step of the ingestion process to bring
the changes into the catalog database and make the updates permanent.
The DAG has a structure similar to the batched_update DAG, but with a few key
differences:
1. Given the structure of the TSV, it updates a single column at a time.
2. The batch updates are parallelized to speed up the process. The maximum number of
active tasks is limited to 3 (at first to try it out and) to avoid overwhelming
the database.
3. It needs slightly different SQL queries to update the data. One change for example,
is that it only works with the `image` table given that is the only one where the
cleaning steps are applied to in the ingestion server.
"""

import logging
Expand All @@ -13,7 +23,9 @@
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.param import Param
from airflow.operators.python import get_current_context
from airflow.utils.trigger_rule import TriggerRule

from common import slack
from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID
from common.sql import (
RETURN_ROW_COUNT,
Expand All @@ -30,6 +42,7 @@

@task
def count_dirty_rows(temp_table_name: str, task: AbstractOperator = None):
"""Get the number of rows in the temp table before the updates."""
count = run_sql.function(
dry_run=False,
sql_template=f"SELECT COUNT(*) FROM {temp_table_name}",
Expand All @@ -43,6 +56,7 @@ def count_dirty_rows(temp_table_name: str, task: AbstractOperator = None):

@task
def get_batches(total_row_count: int, batch_size: int) -> list[tuple[int, int]]:
"""Return a list of tuples with the start and end row_id for each batch."""
return [(i, i + batch_size) for i in range(0, total_row_count, batch_size)]


Expand All @@ -55,6 +69,9 @@ def update_batch(
):
batch_start, batch_end = batch
logger.info(f"Going through row_id {batch_start:,} to {batch_end:,}.")

# Includes the formatted batch range in the context to be used as the index
# template for easier identification of the tasks in the UI.
context = get_current_context()
context["index_template"] = f"{batch_start}__{batch_end}"

Expand All @@ -72,6 +89,21 @@ def update_batch(
return count


@task
def sum_up_counts(counts: list[int]) -> int:
return sum(counts)


@task
def notify_slack(text):
slack.send_message(
text=text,
username=constants.SLACK_USERNAME,
icon_emoji=constants.SLACK_ICON,
dag_id=constants.DAG_ID,
)


@dag(
dag_id=constants.DAG_ID,
default_args={
Expand Down Expand Up @@ -100,7 +132,6 @@ def update_batch(
enum=["url", "creator_url", "foreign_landing_url"],
description="The column of the table to apply the updates.",
),
# "table": Param(type="str", description="The media table to update."),
"batch_size": Param(
default=10000,
type="integer",
Expand All @@ -113,11 +144,12 @@ def catalog_cleaner():
column = "{{ params.column }}"
temp_table_name = f"temp_cleaned_image_{column}"

create = PGExecuteQueryOperator(
create_table = PGExecuteQueryOperator(
task_id="create_temp_table",
postgres_conn_id=POSTGRES_CONN_ID,
sql=constants.CREATE_SQL.format(temp_table_name=temp_table_name, column=column),
execution_timeout=timedelta(minutes=1),
sql=constants.CREATE_TEMP_TABLE_SQL.format(
temp_table_name=temp_table_name, column=column
),
)

load = PGExecuteQueryOperator(
Expand All @@ -130,16 +162,28 @@ def catalog_cleaner():
s3_path_to_file="{{ params.s3_path }}",
aws_region=aws_region,
),
execution_timeout=timedelta(hours=1),
)

create_index = PGExecuteQueryOperator(
task_id="create_temp_table_index",
postgres_conn_id=POSTGRES_CONN_ID,
sql=constants.CREATE_INDEX_SQL.format(temp_table_name=temp_table_name),
)

count = count_dirty_rows(temp_table_name)

batches = get_batches(total_row_count=count, batch_size="{{ params.batch_size }}")

updates = update_batch.partial(
temp_table_name=temp_table_name, column=column
).expand(batch=batches)
updates = (
update_batch.override(
max_active_tis_per_dag=3,
retries=0,
)
.partial(temp_table_name=temp_table_name, column=column)
.expand(batch=batches)
)

total = sum_up_counts(updates)

drop = PGExecuteQueryOperator(
task_id="drop_temp_tables",
Expand All @@ -148,7 +192,21 @@ def catalog_cleaner():
execution_timeout=timedelta(minutes=1),
)

create >> load >> count >> updates >> drop
notify_success = notify_slack.override(task_id="notify_success")(
f"Upstream cleaning was completed successfully updating column `{column}` for"
f" {total} rows.",
)

notify_failure = notify_slack.override(
task_id="notify_failure", trigger_rule=TriggerRule.ONE_FAILED
)("Upstream cleaning failed. Check the logs for more information.")

create_table >> load >> create_index >> count

# Make explicit the dependency from total (sum_up_counts task) to show it in the graph
updates >> [drop, total] >> notify_success

drop >> notify_failure


catalog_cleaner()
8 changes: 6 additions & 2 deletions catalog/dags/database/catalog_cleaner/constants.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
DAG_ID = "catalog_cleaner"
SLACK_USERNAME = "Catalog Cleaner DAG"
SLACK_ICON = ":disk-cleanup:"

CREATE_SQL = """
CREATE_TEMP_TABLE_SQL = """
DROP TABLE IF EXISTS {temp_table_name};
CREATE UNLOGGED TABLE {temp_table_name} (
row_id SERIAL,
identifier uuid NOT NULL,
{column} TEXT
{column} TEXT NOT NULL
);
"""

Expand All @@ -17,6 +19,8 @@
);
"""

CREATE_INDEX_SQL = "CREATE INDEX ON {temp_table_name}(row_id);"

UPDATE_SQL = """
UPDATE image SET {column} = tmp.{column}, updated_on = NOW()
FROM {temp_table_name} AS tmp
Expand Down
Loading

0 comments on commit e8db5c7

Please sign in to comment.