Skip to content

Commit

Permalink
Fix constraints for sqlite and move migration to 2.9.2
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed May 2, 2024
1 parent b1ee7c0 commit aa441ce
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,35 @@
# specific language governing permissions and limitations
# under the License.

"""Update missing constraints
"""Update missing constraints.
Revision ID: 686269002441
Revises: 677fdbb7fc54
Revises: bff083ad727d
Create Date: 2024-04-15 14:19:49.913797
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy import literal

# revision identifiers, used by Alembic.
revision = '686269002441'
down_revision = '677fdbb7fc54'
revision = "686269002441"
down_revision = "bff083ad727d"
branch_labels = None
depends_on = None
airflow_version = '2.10.0'
airflow_version = "2.9.2"


def upgrade():
"""Apply Update missing constraints"""
"""Apply Update missing constraints."""
conn = op.get_bind()
if conn.dialect.name == "sqlite":
# SQLite does not support DROP CONSTRAINT
return
if conn.dialect.name == "mysql":
# TODO: Rewrite these queries to use alembic when lowest MYSQL version supports IF EXISTS
conn.execute(sa.text("""
conn.execute(
sa.text("""
set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = 'connection' AND
Expand All @@ -55,9 +55,11 @@ def upgrade():
prepare stmt from @var;
execute stmt;
deallocate prepare stmt;
"""))
""")
)
# Dropping the below and recreating cause there's no IF NOT EXISTS in mysql
conn.execute(sa.text("""
conn.execute(
sa.text("""
set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = 'connection' AND
Expand All @@ -68,26 +70,56 @@ def upgrade():
prepare stmt from @var;
execute stmt;
deallocate prepare stmt;
"""))
""")
)
elif conn.dialect.name == "sqlite":
# SQLite does not support DROP CONSTRAINT
# We have to recreate the table without the constraint
conn.execute(sa.text("PRAGMA foreign_keys=off"))
conn.execute(sa.text("ALTER TABLE connection RENAME TO connection_old"))
conn.execute(
sa.text("""
CREATE TABLE "connection" (
id INTEGER NOT NULL,
conn_id VARCHAR(250) NOT NULL,
conn_type VARCHAR(500) NOT NULL,
host VARCHAR(500),
schema VARCHAR(500),
login TEXT,
password TEXT,
port INTEGER,
extra TEXT,
is_encrypted BOOLEAN,
is_extra_encrypted BOOLEAN,
description VARCHAR(5000),
CONSTRAINT connection_pkey PRIMARY KEY (id),
CONSTRAINT connection_conn_id_uq UNIQUE (conn_id)
)
""")
)
conn.execute(sa.text("INSERT INTO connection SELECT * FROM connection_old"))
conn.execute(sa.text("DROP TABLE connection_old"))
conn.execute(sa.text("PRAGMA foreign_keys=on"))
else:
op.execute("ALTER TABLE connection DROP CONSTRAINT IF EXISTS unique_conn_id")
# Dropping and recreating cause there's no IF NOT EXISTS
op.execute("ALTER TABLE connection DROP CONSTRAINT IF EXISTS connection_conn_id_uq")
with op.batch_alter_table('connection') as batch_op:
batch_op.create_unique_constraint(batch_op.f('connection_conn_id_uq'), ['conn_id'])
with op.batch_alter_table("connection") as batch_op:
batch_op.create_unique_constraint(batch_op.f("connection_conn_id_uq"), ["conn_id"])

max_cons = sa.table('dag', sa.column('max_consecutive_failed_dag_runs'))
max_cons = sa.table("dag", sa.column("max_consecutive_failed_dag_runs"))
op.execute(max_cons.update().values(max_consecutive_failed_dag_runs=literal("0")))
with op.batch_alter_table('dag') as batch_op:
batch_op.alter_column('max_consecutive_failed_dag_runs', existing_type=sa.Integer(), nullable=False)
with op.batch_alter_table("dag") as batch_op:
batch_op.alter_column("max_consecutive_failed_dag_runs", existing_type=sa.Integer(), nullable=False)

with op.batch_alter_table("task_instance") as batch_op:
batch_op.drop_constraint("task_instance_dag_run_fkey", type_="foreignkey")
with op.batch_alter_table("task_reschedule") as batch_op:
batch_op.drop_constraint("task_reschedule_dr_fkey", type_="foreignkey")

if conn.dialect.name == "mysql":
conn.execute(sa.text("""
conn.execute(
sa.text("""
set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = 'dag_run' AND
Expand All @@ -98,8 +130,10 @@ def upgrade():
prepare stmt from @var;
execute stmt;
deallocate prepare stmt;
"""))
conn.execute(sa.text("""
""")
)
conn.execute(
sa.text("""
set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = 'dag_run' AND
Expand All @@ -110,9 +144,11 @@ def upgrade():
prepare stmt from @var;
execute stmt;
deallocate prepare stmt;
"""))
""")
)
# below we drop and recreate the constraints because there's no IF NOT EXISTS
conn.execute(sa.text("""
conn.execute(
sa.text("""
set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = 'dag_run' AND
Expand All @@ -123,8 +159,10 @@ def upgrade():
prepare stmt from @var;
execute stmt;
deallocate prepare stmt;
"""))
conn.execute(sa.text("""
""")
)
conn.execute(
sa.text("""
set @var=if((SELECT true FROM information_schema.TABLE_CONSTRAINTS WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = 'dag_run' AND
Expand All @@ -135,19 +173,79 @@ def upgrade():
prepare stmt from @var;
execute stmt;
deallocate prepare stmt;
"""))
""")
)
elif conn.dialect.name == "sqlite":
# SQLite does not support DROP CONSTRAINT
# We have to recreate the table without the constraint
conn.execute(sa.text("PRAGMA foreign_keys=off"))
conn.execute(sa.text("ALTER TABLE dag_run RENAME TO dag_run_old"))
conn.execute(
sa.text("""
CREATE TABLE dag_run (
id INTEGER NOT NULL,
dag_id VARCHAR(250) NOT NULL,
queued_at TIMESTAMP,
execution_date TIMESTAMP NOT NULL,
start_date TIMESTAMP,
end_date TIMESTAMP,
state VARCHAR(50),
run_id VARCHAR(250) NOT NULL,
creating_job_id INTEGER,
external_trigger BOOLEAN,
run_type VARCHAR(50) NOT NULL,
conf BLOB,
data_interval_start TIMESTAMP,
data_interval_end TIMESTAMP,
last_scheduling_decision TIMESTAMP,
dag_hash VARCHAR(32),
log_template_id INTEGER,
updated_at TIMESTAMP,
clear_number INTEGER DEFAULT '0' NOT NULL,
CONSTRAINT dag_run_pkey PRIMARY KEY (id),
CONSTRAINT dag_run_dag_id_execution_date_key UNIQUE (dag_id, execution_date),
CONSTRAINT dag_run_dag_id_run_id_key UNIQUE (dag_id, run_id),
CONSTRAINT task_instance_log_template_id_fkey FOREIGN KEY(log_template_id) REFERENCES log_template (id) ON DELETE NO ACTION
)
""")
)

conn.execute(sa.text("INSERT INTO dag_run SELECT * FROM dag_run_old"))
conn.execute(sa.text("DROP TABLE dag_run_old"))
conn.execute(sa.text("PRAGMA foreign_keys=on"))
with op.batch_alter_table("dag_run") as batch_op:
batch_op.create_index("dag_id_state", ["dag_id", "state"], if_not_exists=True)
batch_op.create_index("idx_dag_run_dag_id", ["dag_id"], if_not_exists=True)
batch_op.create_index(
"idx_dag_run_running_dags",
["state", "dag_id"],
sqlite_where=sa.text("state='running'"),
if_not_exists=True,
)
batch_op.create_index(
"idx_dag_run_queued_dags",
["state", "dag_id"],
sqlite_where=sa.text("state='queued'"),
if_not_exists=True,
)

else:
op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS dag_run_dag_id_execution_date_uq")
op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS dag_run_dag_id_run_id_uq")
# below we drop and recreate the constraints because there's no IF NOT EXISTS
op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS dag_run_dag_id_execution_date_key")
op.execute("ALTER TABLE dag_run DROP CONSTRAINT IF EXISTS dag_run_dag_id_run_id_key")
with op.batch_alter_table('dag_run') as batch_op:
batch_op.create_unique_constraint('dag_run_dag_id_execution_date_key', ['dag_id', 'execution_date'])
batch_op.create_unique_constraint('dag_run_dag_id_run_id_key', ['dag_id', 'run_id'])
with op.batch_alter_table("dag_run") as batch_op:
batch_op.create_unique_constraint("dag_run_dag_id_execution_date_key", ["dag_id", "execution_date"])
batch_op.create_unique_constraint("dag_run_dag_id_run_id_key", ["dag_id", "run_id"])
with op.batch_alter_table("task_instance") as batch_op:
batch_op.create_foreign_key("task_instance_dag_run_fkey", "dag_run", ["dag_id", "run_id"], ["dag_id", "run_id"],
ondelete="CASCADE")
batch_op.create_foreign_key(
"task_instance_dag_run_fkey",
"dag_run",
["dag_id", "run_id"],
["dag_id", "run_id"],
ondelete="CASCADE",
)
with op.batch_alter_table("task_reschedule") as batch_op:
batch_op.create_foreign_key(
"task_reschedule_dr_fkey",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""add new executor field to db.
Revision ID: 677fdbb7fc54
Revises: 1949afb29106
Revises: 686269002441
Create Date: 2024-04-01 15:26:59.186579
"""
Expand All @@ -31,7 +31,7 @@

# revision identifiers, used by Alembic.
revision = "677fdbb7fc54"
down_revision = "bff083ad727d"
down_revision = "686269002441"
branch_labels = None
depends_on = None
airflow_version = "2.10.0"
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
"2.8.0": "10b52ebd31f7",
"2.8.1": "88344c1d9134",
"2.9.0": "1949afb29106",
"2.9.2": "bff083ad727d",
"2.9.2": "686269002441",
"2.10.0": "677fdbb7fc54",
}

Expand Down
3 changes: 1 addition & 2 deletions docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
320570337a9f2b7f0a118ca22e86633df6d6a1ac5712add285cf2807c0382ef6

8e508a6b71c59805a60b6aebc9cb2aff1d76a16ed0d7419dc3a9a32721ea2ef9
8 changes: 4 additions & 4 deletions docs/apache-airflow/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit aa441ce

Please sign in to comment.