Skip to content

Commit

Permalink
Fix alembic autogeneration and rename mismatching constraints (#39032)
Browse files Browse the repository at this point in the history
* Fix alembic autogeneration and rename mismatching constraints

The alembic autogeneration is not working as expected and the tests
were detecting it because we use DBs created from the ORM to run tests.
When a change is made in the ORM and the ORM is used to initialize the
database for tests, the changes in the ORM will appear the same with what
is in the migration file. To be sure that both match, we have to compare
the database generated using the migration file to the database that could be
created from the ORM.
To fix this, I added 'use_migration_file' arg to resetdb function and updated
the db reset in conftest to use migration file during test db reset.

As part of this fix, I also updated mismatching constraint names. The update was
done in the migration file instead of the ORM as I take the ORM as the source
of truth. New airflow users create their DB from the ORM with the correct naming
because we have a naming convention. Old airflow users would have to upgrade
to use these names from ORM instead of the reverse.

I also removed the `sqlite_sequence` table which is specific to sqlite and not
needed for anything. An alternative would be to add `sqlite_autoincrement` to
table args in the ORM and migration but this table is not that useful.

* fixup! Fix alembic autogeneration and rename mismatching constraints

* fixup! fixup! Fix alembic autogeneration and rename mismatching constraints

* fixup! fixup! fixup! Fix alembic autogeneration and rename mismatching constraints

* Fix mysql, sqlite and issue with cascading deletes

* fixup! Fix mysql, sqlite and issue with cascading deletes

* Fix migration for mysql

* Fix clear_number ORM server_default

* Fix type change for mysql

* Fix constraints for sqlite and move migration to 2.9.2

* Fix sqlite constraints update and ignore session_session_id_uq index

* Fix processor_subdir in the migration file for mysql and make use-migration-files an option in db commands

* fixup! Fix processor_subdir in the migration file for mysql and make use-migration-files an option in db commands

* Apply suggestions from code review

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

---------

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
(cherry picked from commit 00f0969)
  • Loading branch information
ephraimbuddy authored and utkarsharma2 committed Jun 4, 2024
1 parent 66322be commit 1f2bb57
Show file tree
Hide file tree
Showing 13 changed files with 383 additions and 28 deletions.
10 changes: 9 additions & 1 deletion airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,12 @@ def string_lower_type(val):
action="store_true",
default=False,
)
ARG_DB_USE_MIGRATION_FILES = Arg(
("-m", "--use-migration-files"),
help="Use migration files to perform migration",
action="store_true",
default=False,
)

# webserver
ARG_PORT = Arg(
Expand Down Expand Up @@ -1525,7 +1531,7 @@ class GroupCommand(NamedTuple):
name="reset",
help="Burn down and rebuild the metadata database",
func=lazy_load_command("airflow.cli.commands.db_command.resetdb"),
args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_VERBOSE),
args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_DB_USE_MIGRATION_FILES, ARG_VERBOSE),
),
ActionCommand(
name="upgrade",
Expand All @@ -1545,6 +1551,7 @@ class GroupCommand(NamedTuple):
ARG_DB_FROM_REVISION,
ARG_DB_FROM_VERSION,
ARG_DB_RESERIALIZE_DAGS,
ARG_DB_USE_MIGRATION_FILES,
ARG_VERBOSE,
),
hide=True,
Expand All @@ -1568,6 +1575,7 @@ class GroupCommand(NamedTuple):
ARG_DB_FROM_REVISION,
ARG_DB_FROM_VERSION,
ARG_DB_RESERIALIZE_DAGS,
ARG_DB_USE_MIGRATION_FILES,
ARG_VERBOSE,
),
),
Expand Down
3 changes: 2 additions & 1 deletion airflow/cli/commands/db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def resetdb(args):
print(f"DB: {settings.engine.url!r}")
if not (args.yes or input("This will drop existing tables if they exist. Proceed? (y/n)").upper() == "Y"):
raise SystemExit("Cancelled")
db.resetdb(skip_init=args.skip_init)
db.resetdb(skip_init=args.skip_init, use_migration_files=args.use_migration_files)


def upgradedb(args):
Expand Down Expand Up @@ -132,6 +132,7 @@ def migratedb(args):
from_revision=from_revision,
show_sql_only=args.show_sql_only,
reserialize_dags=args.reserialize_dags,
use_migration_files=args.use_migration_files,
)
if not args.show_sql_only:
print("Database migrating done!")
Expand Down
3 changes: 3 additions & 0 deletions airflow/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

def include_object(_, name, type_, *args):
"""Filter objects for autogenerating revisions."""
# Ignore the sqlite_sequence table, which is an internal SQLite construct
if name == "sqlite_sequence":
return False
# Ignore _anything_ to do with Celery, or FlaskSession's tables
if type_ == "table" and (name.startswith("celery_") or name == "session"):
return False
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Fix inconsistency between ORM and migration files.
Revision ID: 686269002441
Revises: bff083ad727d
Create Date: 2024-04-15 14:19:49.913797
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy import literal

# revision identifiers, used by Alembic.
revision = "686269002441"
down_revision = "bff083ad727d"
branch_labels = None
depends_on = None
airflow_version = "2.9.2"


def upgrade():
"""Apply Update missing constraints."""
conn = op.get_bind()
if conn.dialect.name == "mysql":
# TODO: Rewrite these queries to use alembic when lowest MYSQL version supports IF EXISTS
conn.execute(
sa.text("""
set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = 'connection' AND
CONSTRAINT_NAME = 'unique_conn_id' AND
CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE connection
drop constraint unique_conn_id','select 1');
prepare stmt from @var;
execute stmt;
deallocate prepare stmt;
""")
)
# Dropping the below and recreating cause there's no IF NOT EXISTS in mysql
conn.execute(
sa.text("""
set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = 'connection' AND
CONSTRAINT_NAME = 'connection_conn_id_uq' AND
CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE connection
drop constraint connection_conn_id_uq','select 1');
prepare stmt from @var;
execute stmt;
deallocate prepare stmt;
""")
)
elif conn.dialect.name == "sqlite":
# SQLite does not support DROP CONSTRAINT
# We have to recreate the table without the constraint
conn.execute(sa.text("PRAGMA foreign_keys=off"))
conn.execute(
sa.text("""
CREATE TABLE connection_new (
id INTEGER NOT NULL,
conn_id VARCHAR(250) NOT NULL,
conn_type VARCHAR(500) NOT NULL,
host VARCHAR(500),
schema VARCHAR(500),
login TEXT,
password TEXT,
port INTEGER,
extra TEXT,
is_encrypted BOOLEAN,
is_extra_encrypted BOOLEAN,
description VARCHAR(5000),
CONSTRAINT connection_pkey PRIMARY KEY (id),
CONSTRAINT connection_conn_id_uq UNIQUE (conn_id)
)
""")
)
conn.execute(sa.text("INSERT INTO connection_new SELECT * FROM connection"))
conn.execute(sa.text("DROP TABLE connection"))
conn.execute(sa.text("ALTER TABLE connection_new RENAME TO connection"))
conn.execute(sa.text("PRAGMA foreign_keys=on"))
else:
op.execute("ALTER TABLE connection DROP CONSTRAINT IF EXISTS unique_conn_id")
# Dropping and recreating cause there's no IF NOT EXISTS
op.execute("ALTER TABLE connection DROP CONSTRAINT IF EXISTS connection_conn_id_uq")

with op.batch_alter_table("connection") as batch_op:
batch_op.create_unique_constraint(batch_op.f("connection_conn_id_uq"), ["conn_id"])

max_cons = sa.table("dag", sa.column("max_consecutive_failed_dag_runs"))
op.execute(max_cons.update().values(max_consecutive_failed_dag_runs=literal("0")))
with op.batch_alter_table("dag") as batch_op:
batch_op.alter_column("max_consecutive_failed_dag_runs", existing_type=sa.Integer(), nullable=False)

with op.batch_alter_table("task_instance") as batch_op:
batch_op.drop_constraint("task_instance_dag_run_fkey", type_="foreignkey")

with op.batch_alter_table("task_reschedule") as batch_op:
batch_op.drop_constraint("task_reschedule_dr_fkey", type_="foreignkey")

if conn.dialect.name == "mysql":
conn.execute(
sa.text("""
set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = 'dag_run' AND
CONSTRAINT_NAME = 'dag_run_dag_id_execution_date_uq' AND
CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE dag_run
drop constraint dag_run_dag_id_execution_date_uq','select 1');
prepare stmt from @var;
execute stmt;
deallocate prepare stmt;
""")
)
conn.execute(
sa.text("""
set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = 'dag_run' AND
CONSTRAINT_NAME = 'dag_run_dag_id_run_id_uq' AND
CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE dag_run
drop constraint dag_run_dag_id_run_id_uq','select 1');
prepare stmt from @var;
execute stmt;
deallocate prepare stmt;
""")
)
# below we drop and recreate the constraints because there's no IF NOT EXISTS
conn.execute(
sa.text("""
set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = 'dag_run' AND
CONSTRAINT_NAME = 'dag_run_dag_id_execution_date_key' AND
CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE dag_run
drop constraint dag_run_dag_id_execution_date_key','select 1');
prepare stmt from @var;
execute stmt;
deallocate prepare stmt;
""")
)
conn.execute(
sa.text("""
set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = 'dag_run' AND
CONSTRAINT_NAME = 'dag_run_dag_id_run_id_key' AND
CONSTRAINT_TYPE = 'UNIQUE') = true,'ALTER TABLE dag_run
drop constraint dag_run_dag_id_run_id_key','select 1');
prepare stmt from @var;
execute stmt;
deallocate prepare stmt;
""")
)
with op.batch_alter_table("callback_request", schema=None) as batch_op:
batch_op.alter_column(
"processor_subdir",
existing_type=sa.Text(length=2000),
type_=sa.String(length=2000),
existing_nullable=True,
)

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.alter_column(
"processor_subdir",
existing_type=sa.Text(length=2000),
type_=sa.String(length=2000),
existing_nullable=True,
)

with op.batch_alter_table("import_error", schema=None) as batch_op:
batch_op.alter_column(
"processor_subdir",
existing_type=sa.Text(length=2000),
type_=sa.String(length=2000),
existing_nullable=True,
)

with op.batch_alter_table("serialized_dag", schema=None) as batch_op:
batch_op.alter_column(
"processor_subdir",
existing_type=sa.Text(length=2000),
type_=sa.String(length=2000),
existing_nullable=True,
)

elif conn.dialect.name == "sqlite":
# SQLite does not support DROP CONSTRAINT
# We have to recreate the table without the constraint
conn.execute(sa.text("PRAGMA foreign_keys=off"))
conn.execute(
sa.text("""
CREATE TABLE dag_run_new (
id INTEGER NOT NULL,
dag_id VARCHAR(250) NOT NULL,
queued_at TIMESTAMP,
execution_date TIMESTAMP NOT NULL,
start_date TIMESTAMP,
end_date TIMESTAMP,
state VARCHAR(50),
run_id VARCHAR(250) NOT NULL,
creating_job_id INTEGER,
external_trigger BOOLEAN,
run_type VARCHAR(50) NOT NULL,
conf BLOB,
data_interval_start TIMESTAMP,
data_interval_end TIMESTAMP,
last_scheduling_decision TIMESTAMP,
dag_hash VARCHAR(32),
log_template_id INTEGER,
updated_at TIMESTAMP,
clear_number INTEGER DEFAULT '0' NOT NULL,
CONSTRAINT dag_run_pkey PRIMARY KEY (id),
CONSTRAINT dag_run_dag_id_execution_date_key UNIQUE (dag_id, execution_date),
CONSTRAINT dag_run_dag_id_run_id_key UNIQUE (dag_id, run_id),
CONSTRAINT task_instance_log_template_id_fkey FOREIGN KEY(log_template_id) REFERENCES log_template (id) ON DELETE NO ACTION
)
""")
)

conn.execute(sa.text("INSERT INTO dag_run_new SELECT * FROM dag_run"))
conn.execute(sa.text("DROP TABLE dag_run"))
conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run"))
conn.execute(sa.text("PRAGMA foreign_keys=on"))
with op.batch_alter_table("dag_run") as batch_op:
batch_op.create_index("dag_id_state", ["dag_id", "state"], if_not_exists=True)
batch_op.create_index("idx_dag_run_dag_id", ["dag_id"], if_not_exists=True)
batch_op.create_index(
"idx_dag_run_running_dags",
["state", "dag_id"],
sqlite_where=sa.text("state='running'"),
if_not_exists=True,
)
batch_op.create_index(
"idx_dag_run_queued_dags",
["state", "dag_id"],
sqlite_where=sa.text("state='queued'"),
if_not_exists=True,
)

else:
op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS dag_run_dag_id_execution_date_uq")
op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS dag_run_dag_id_run_id_uq")
# below we drop and recreate the constraints because there's no IF NOT EXISTS
op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS dag_run_dag_id_execution_date_key")
op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS dag_run_dag_id_run_id_key")

with op.batch_alter_table("dag_run") as batch_op:
batch_op.create_unique_constraint("dag_run_dag_id_execution_date_key", ["dag_id", "execution_date"])
batch_op.create_unique_constraint("dag_run_dag_id_run_id_key", ["dag_id", "run_id"])

with op.batch_alter_table("task_instance") as batch_op:
batch_op.create_foreign_key(
"task_instance_dag_run_fkey",
"dag_run",
["dag_id", "run_id"],
["dag_id", "run_id"],
ondelete="CASCADE",
)

with op.batch_alter_table("task_reschedule") as batch_op:
batch_op.create_foreign_key(
"task_reschedule_dr_fkey",
"dag_run",
["dag_id", "run_id"],
["dag_id", "run_id"],
ondelete="CASCADE",
)


def downgrade():
"""NO downgrade because this is to make ORM consistent with the database."""
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class DagRun(Base, LoggingMixin):
# Keeps track of the number of times the dagrun had been cleared.
# This number is incremented only when the DagRun is re-Queued,
# when the DagRun is cleared.
clear_number = Column(Integer, default=0, nullable=False)
clear_number = Column(Integer, default=0, nullable=False, server_default="0")

# Remove this `if` after upgrading Sphinx-AutoAPI
if not TYPE_CHECKING and "BUILDING_AIRFLOW_DOCS" in os.environ:
Expand Down
Loading

0 comments on commit 1f2bb57

Please sign in to comment.