Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change approach to finding bad rows to LEFT OUTER JOIN. #23528

Merged
merged 1 commit into from
May 6, 2022

Conversation

ashb
Copy link
Member

@ashb ashb commented May 6, 2022

Rather than sub-selects (two for count, or one for the CREATE TABLE).

For a large database (27m TaskInstances, 2m DagRuns) this takes the
time from 10minutes to around 3 minutes per table (we have 3) down to 3
minutes per table. (All times on Postgres.)

Tested:

  • postges
  • mysql
  • mssql
  • sqlite

Before:

CREATE TABLE _airflow_moved__2_3__dangling__rendered_task_instance_fields AS
SELECT
  rendered_task_instance_fields.dag_id AS dag_id,
  rendered_task_instance_fields.task_id AS task_id,
  rendered_task_instance_fields.execution_date AS execution_date,
  rendered_task_instance_fields.rendered_fields AS rendered_fields,
  rendered_task_instance_fields.k8s_pod_yaml AS k8s_pod_yaml +
FROM
  rendered_task_instance_fields
WHERE
  NOT (
    EXISTS (
      SELECT
        1
      FROM
        task_instance
        JOIN dag_run ON dag_run.dag_id = task_instance.dag_id
        AND dag_run.run_id = task_instance.run_id
      WHERE
        rendered_task_instance_fields.dag_id = task_instance.dag_id
        AND rendered_task_instance_fields.task_id = task_instance.task_id
        AND rendered_task_instance_fields.execution_date = dag_run.execution_date
    )
  )

After:

CREATE TABLE _airflow_moved__2_3__dangling__rendered_task_instance_fields AS
SELECT
  rendered_task_instance_fields.dag_id AS dag_id,
  rendered_task_instance_fields.task_id AS task_id,
  rendered_task_instance_fields.execution_date AS execution_date,
  rendered_task_instance_fields.rendered_fields AS rendered_fields,
  rendered_task_instance_fields.k8s_pod_yaml AS k8s_pod_yaml +
FROM
  rendered_task_instance_fields
  LEFT OUTER JOIN dag_run ON rendered_task_instance_fields.dag_id = dag_run.dag_id
  AND rendered_task_instance_fields.execution_date = dag_run.execution_date
  LEFT OUTER JOIN task_instance ON dag_run.dag_id = task_instance.dag_id
  AND dag_run.run_id = task_instance.run_id
  AND rendered_task_instance_fields.task_id = task_instance.task_id
WHERE
  task_instance.dag_id IS NULL
  OR dag_run.dag_id IS NULL
;

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragement file, named {pr_number}.significant.rst, in newsfragments.

Rather than two sub-selects,

For a _large_ database (27m TaskInstances, 2m DagRuns) this takes the
time from 10minutes to around 3 minutes per table (we have 3) down to 3
minutes per table.

Before:

```sql
SELECT
    count(*) AS count_1
FROM (
    SELECT
        rendered_task_instance_fields.dag_id AS dag_id,
        rendered_task_instance_fields.task_id AS task_id,
        rendered_task_instance_fields.execution_date AS execution_date,
        rendered_task_instance_fields.rendered_fields AS rendered_fields,
        rendered_task_instance_fields.k8s_pod_yaml AS k8s_pod_yaml
    FROM rendered_task_instance_fields
    WHERE
        NOT (exists(SELECT
                        1
                    FROM task_instance
                    JOIN dag_run
                        ON dag_run.run_id = task_instance.run_id
                                AND dag_run.dag_id = task_instance.dag_id
                    WHERE
                        rendered_task_instance_fields.dag_id = task_instance.dag_id
                        AND rendered_task_instance_fields.task_id = task_instance.task_id
                        AND rendered_task_instance_fields.execution_date = dag_run.execution_date
            ))
     ) AS anon_1
;
```

After:

```sql
CREATE TABLE _airflow_moved__2_3__dangling__rendered_task_instance_fields AS
SELECT
  rendered_task_instance_fields.dag_id AS dag_id,
  rendered_task_instance_fields.task_id AS task_id,
  rendered_task_instance_fields.execution_date AS execution_date,
  rendered_task_instance_fields.rendered_fields AS rendered_fields,
  rendered_task_instance_fields.k8s_pod_yaml AS k8s_pod_yaml +
FROM
  rendered_task_instance_fields
  LEFT OUTER JOIN dag_run ON rendered_task_instance_fields.dag_id = dag_run.dag_id
  AND rendered_task_instance_fields.execution_date = dag_run.execution_date
  LEFT OUTER JOIN task_instance ON dag_run.dag_id = task_instance.dag_id
  AND dag_run.run_id = task_instance.run_id
  AND rendered_task_instance_fields.task_id = task_instance.task_id
WHERE
  task_instance.dag_id IS NULL
  OR dag_run.dag_id IS NULL
;
```
@ashb ashb requested review from dstandish and kaxil May 6, 2022 13:35
@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label May 6, 2022
@github-actions
Copy link

github-actions bot commented May 6, 2022

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@jedcunningham
Copy link
Member

I've tested this on mssql, both 2.1.4 and 2.2.5.

@jedcunningham
Copy link
Member

I've also tested this on mysql, both 2.1.4 and 2.2.5.

@ashb ashb merged commit 22a9293 into apache:main May 6, 2022
@ashb ashb deleted the left-outer-join-not-subselect branch May 6, 2022 16:02
jedcunningham pushed a commit to astronomer/airflow that referenced this pull request May 6, 2022
Rather than sub-selects (two for count, or one for the CREATE TABLE).

For a _large_ database (27m TaskInstances, 2m DagRuns) this takes the
time from 10minutes to around 3 minutes per table (we have 3) down to 3
minutes per table. (All times on Postgres.)

Before:

```sql
CREATE TABLE _airflow_moved__2_3__dangling__rendered_task_instance_fields AS
SELECT
  rendered_task_instance_fields.dag_id AS dag_id,
  rendered_task_instance_fields.task_id AS task_id,
  rendered_task_instance_fields.execution_date AS execution_date,
  rendered_task_instance_fields.rendered_fields AS rendered_fields,
  rendered_task_instance_fields.k8s_pod_yaml AS k8s_pod_yaml +
FROM
  rendered_task_instance_fields
WHERE
  NOT (
    EXISTS (
      SELECT
        1
      FROM
        task_instance
        JOIN dag_run ON dag_run.dag_id = task_instance.dag_id
        AND dag_run.run_id = task_instance.run_id
      WHERE
        rendered_task_instance_fields.dag_id = task_instance.dag_id
        AND rendered_task_instance_fields.task_id = task_instance.task_id
        AND rendered_task_instance_fields.execution_date = dag_run.execution_date
    )
  )
```

After:

```sql
CREATE TABLE _airflow_moved__2_3__dangling__rendered_task_instance_fields AS
SELECT
  rendered_task_instance_fields.dag_id AS dag_id,
  rendered_task_instance_fields.task_id AS task_id,
  rendered_task_instance_fields.execution_date AS execution_date,
  rendered_task_instance_fields.rendered_fields AS rendered_fields,
  rendered_task_instance_fields.k8s_pod_yaml AS k8s_pod_yaml +
FROM
  rendered_task_instance_fields
  LEFT OUTER JOIN dag_run ON rendered_task_instance_fields.dag_id = dag_run.dag_id
  AND rendered_task_instance_fields.execution_date = dag_run.execution_date
  LEFT OUTER JOIN task_instance ON dag_run.dag_id = task_instance.dag_id
  AND dag_run.run_id = task_instance.run_id
  AND rendered_task_instance_fields.task_id = task_instance.task_id
WHERE
  task_instance.dag_id IS NULL
  OR dag_run.dag_id IS NULL
;
```

(cherry picked from commit 22a9293)
@ephraimbuddy ephraimbuddy added the type:bug-fix Changelog: Bug Fixes label May 8, 2022
@ephraimbuddy ephraimbuddy added this to the Airflow 2.3.1 milestone May 8, 2022
ephraimbuddy pushed a commit that referenced this pull request May 8, 2022
Rather than sub-selects (two for count, or one for the CREATE TABLE).

For a _large_ database (27m TaskInstances, 2m DagRuns) this takes the
time from 10minutes to around 3 minutes per table (we have 3) down to 3
minutes per table. (All times on Postgres.)

Before:

```sql
CREATE TABLE _airflow_moved__2_3__dangling__rendered_task_instance_fields AS
SELECT
  rendered_task_instance_fields.dag_id AS dag_id,
  rendered_task_instance_fields.task_id AS task_id,
  rendered_task_instance_fields.execution_date AS execution_date,
  rendered_task_instance_fields.rendered_fields AS rendered_fields,
  rendered_task_instance_fields.k8s_pod_yaml AS k8s_pod_yaml +
FROM
  rendered_task_instance_fields
WHERE
  NOT (
    EXISTS (
      SELECT
        1
      FROM
        task_instance
        JOIN dag_run ON dag_run.dag_id = task_instance.dag_id
        AND dag_run.run_id = task_instance.run_id
      WHERE
        rendered_task_instance_fields.dag_id = task_instance.dag_id
        AND rendered_task_instance_fields.task_id = task_instance.task_id
        AND rendered_task_instance_fields.execution_date = dag_run.execution_date
    )
  )
```

After:

```sql
CREATE TABLE _airflow_moved__2_3__dangling__rendered_task_instance_fields AS
SELECT
  rendered_task_instance_fields.dag_id AS dag_id,
  rendered_task_instance_fields.task_id AS task_id,
  rendered_task_instance_fields.execution_date AS execution_date,
  rendered_task_instance_fields.rendered_fields AS rendered_fields,
  rendered_task_instance_fields.k8s_pod_yaml AS k8s_pod_yaml +
FROM
  rendered_task_instance_fields
  LEFT OUTER JOIN dag_run ON rendered_task_instance_fields.dag_id = dag_run.dag_id
  AND rendered_task_instance_fields.execution_date = dag_run.execution_date
  LEFT OUTER JOIN task_instance ON dag_run.dag_id = task_instance.dag_id
  AND dag_run.run_id = task_instance.run_id
  AND rendered_task_instance_fields.task_id = task_instance.task_id
WHERE
  task_instance.dag_id IS NULL
  OR dag_run.dag_id IS NULL
;
```

(cherry picked from commit 22a9293)
ephraimbuddy pushed a commit that referenced this pull request May 21, 2022
Rather than sub-selects (two for count, or one for the CREATE TABLE).

For a _large_ database (27m TaskInstances, 2m DagRuns) this takes the
time from 10minutes to around 3 minutes per table (we have 3) down to 3
minutes per table. (All times on Postgres.)

Before:

```sql
CREATE TABLE _airflow_moved__2_3__dangling__rendered_task_instance_fields AS
SELECT
  rendered_task_instance_fields.dag_id AS dag_id,
  rendered_task_instance_fields.task_id AS task_id,
  rendered_task_instance_fields.execution_date AS execution_date,
  rendered_task_instance_fields.rendered_fields AS rendered_fields,
  rendered_task_instance_fields.k8s_pod_yaml AS k8s_pod_yaml +
FROM
  rendered_task_instance_fields
WHERE
  NOT (
    EXISTS (
      SELECT
        1
      FROM
        task_instance
        JOIN dag_run ON dag_run.dag_id = task_instance.dag_id
        AND dag_run.run_id = task_instance.run_id
      WHERE
        rendered_task_instance_fields.dag_id = task_instance.dag_id
        AND rendered_task_instance_fields.task_id = task_instance.task_id
        AND rendered_task_instance_fields.execution_date = dag_run.execution_date
    )
  )
```

After:

```sql
CREATE TABLE _airflow_moved__2_3__dangling__rendered_task_instance_fields AS
SELECT
  rendered_task_instance_fields.dag_id AS dag_id,
  rendered_task_instance_fields.task_id AS task_id,
  rendered_task_instance_fields.execution_date AS execution_date,
  rendered_task_instance_fields.rendered_fields AS rendered_fields,
  rendered_task_instance_fields.k8s_pod_yaml AS k8s_pod_yaml +
FROM
  rendered_task_instance_fields
  LEFT OUTER JOIN dag_run ON rendered_task_instance_fields.dag_id = dag_run.dag_id
  AND rendered_task_instance_fields.execution_date = dag_run.execution_date
  LEFT OUTER JOIN task_instance ON dag_run.dag_id = task_instance.dag_id
  AND dag_run.run_id = task_instance.run_id
  AND rendered_task_instance_fields.task_id = task_instance.task_id
WHERE
  task_instance.dag_id IS NULL
  OR dag_run.dag_id IS NULL
;
```

(cherry picked from commit 22a9293)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
full tests needed We need to run full set of tests for this PR to merge type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants