From 36eb93a9af548cf1019c40619260263cc75e3862 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 28 Apr 2026 18:43:32 +0530 Subject: [PATCH 1/6] fix: migrate existing deadline rows in migration 0080 upgrade and downgrade Both the upgrade and downgrade paths of migration 0080 (808787349f22 - Modify deadline callback schema) added NOT NULL columns to the deadline table without first populating them from the existing data, causing: * upgrade: NotNullViolation when adding callback JSON NOT NULL to a non-empty table (existing rows have no value for the new column). * downgrade: NotNullViolation on PostgreSQL / silent NULL on MySQL when adding callback VARCHAR(500) NOT NULL after dropping the JSON column, crashing the subsequent 0094 upgrade with json.loads(None). Fix both paths with the same pattern used by migration 0094: 1. Read the existing data before any schema change. 2. Add the new column(s) as nullable so the DDL succeeds on all supported databases. 3. Back-fill the column using a typed SA table clause (so each dialect handles JSON serialisation correctly). 4. Enforce NOT NULL only after every row has a valid value. Upgrade serialises the old (path, kwargs) pair into the {"__data__": {"path": ..., "kwargs": ...}, "__classname__": ..., "__version__": 0} format expected by migration 0094. Downgrade extracts path/kwargs back from that same JSON envelope before restoring the VARCHAR callback column. Made-with: Cursor Co-authored-by: Cursor --- ...0_3_1_0_modify_deadline_callback_schema.py | 191 +++++++++++++- .../test_0080_deadline_callback_migration.py | 241 ++++++++++++++++++ 2 files changed, 428 insertions(+), 4 deletions(-) create mode 100644 airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py diff --git a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py index adb3354512dc0..5eb85e0abb7f2 100644 --- a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py +++ b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py @@ -27,8 +27,13 @@ from __future__ import annotations +import json +from textwrap import dedent + import sqlalchemy as sa -from alembic import op +from alembic import context, op + +from airflow.configuration import conf # revision identifiers, used by Alembic. revision = "808787349f22" @@ -38,17 +43,195 @@ airflow_version = "3.1.0" +_ASYNC_CALLBACK_CLASSNAME = "airflow.sdk.definitions.deadline.AsyncCallback" +# Maximum length of the callback VARCHAR column in the pre-0080 schema. +_CALLBACK_MAX_LEN = 500 + + def upgrade(): """Replace deadline table's string callback and JSON callback_kwargs with JSON callback.""" + if context.is_offline_mode(): + print( + dedent(""" + ------------ + -- WARNING: Unable to migrate the data in the deadline table + -- while in offline mode! All rows in the deadline table will + -- be deleted. + ------------ + """) + ) + op.execute("DELETE FROM deadline") + with op.batch_alter_table("deadline", schema=None) as batch_op: + batch_op.drop_column("callback") + batch_op.drop_column("callback_kwargs") + batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=False)) + return + + conn = op.get_bind() + batch_size = conf.getint("database", "migration_batch_size", fallback=1000) + + # Add the destination column alongside the existing ones so we can migrate + # in batches without loading the whole table into memory at once. + with op.batch_alter_table("deadline", schema=None) as batch_op: + batch_op.add_column(sa.Column("callback_new", sa.JSON(), nullable=True)) + + deadline_read = sa.table( + "deadline", + sa.column("id"), + sa.column("callback"), + sa.column("callback_kwargs", sa.JSON()), + sa.column("callback_new", sa.JSON()), + ) + deadline_write = sa.table( + "deadline", + sa.column("id"), + sa.column("callback_new", sa.JSON()), + ) + + while True: + rows = conn.execute( + sa.select( + deadline_read.c.id, + deadline_read.c.callback, + deadline_read.c.callback_kwargs, + ) + .where(deadline_read.c.callback_new.is_(None)) + .limit(batch_size) + ).fetchall() + + if not rows: + break + + batch = [] + for row in rows: + path = row[1] or "" + kwargs = row[2] + if isinstance(kwargs, str): + kwargs = json.loads(kwargs) if kwargs else {} + if not isinstance(kwargs, dict): + kwargs = {} + batch.append( + { + "row_id": row[0], + "new_callback": { + "__data__": {"path": path, "kwargs": kwargs}, + "__classname__": _ASYNC_CALLBACK_CLASSNAME, + "__version__": 0, + }, + } + ) + + conn.execute( + sa.update(deadline_write) + .where(deadline_write.c.id == sa.bindparam("row_id")) + .values(callback_new=sa.bindparam("new_callback")), + batch, + ) + + if len(rows) < batch_size: + break + with op.batch_alter_table("deadline", schema=None) as batch_op: batch_op.drop_column("callback") batch_op.drop_column("callback_kwargs") - batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=False)) + batch_op.alter_column( + "callback_new", + new_column_name="callback", + existing_type=sa.JSON(), + nullable=False, + ) def downgrade(): """Replace deadline table's JSON callback with string callback and JSON callback_kwargs.""" + if context.is_offline_mode(): + print( + dedent(""" + ------------ + -- WARNING: Unable to migrate the data in the deadline table + -- while in offline mode! All rows in the deadline table will + -- be deleted. + ------------ + """) + ) + op.execute("DELETE FROM deadline") + with op.batch_alter_table("deadline", schema=None) as batch_op: + batch_op.drop_column("callback") + batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(), nullable=True)) + batch_op.add_column(sa.Column("callback", sa.String(length=500), nullable=False)) + return + + conn = op.get_bind() + batch_size = conf.getint("database", "migration_batch_size", fallback=1000) + + # Add the restored columns alongside the existing JSON callback so we can + # back-fill in batches before dropping the JSON column. with op.batch_alter_table("deadline", schema=None) as batch_op: - batch_op.drop_column("callback") + batch_op.add_column(sa.Column("callback_old", sa.String(length=500), nullable=True)) batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(), nullable=True)) - batch_op.add_column(sa.Column("callback", sa.String(length=500), nullable=False)) + + deadline_read = sa.table( + "deadline", + sa.column("id"), + sa.column("callback", sa.JSON()), + sa.column("callback_old", sa.String(500)), + ) + deadline_write = sa.table( + "deadline", + sa.column("id"), + sa.column("callback_old", sa.String(500)), + sa.column("callback_kwargs", sa.JSON()), + ) + + while True: + rows = conn.execute( + sa.select(deadline_read.c.id, deadline_read.c.callback) + .where(deadline_read.c.callback_old.is_(None)) + .limit(batch_size) + ).fetchall() + + if not rows: + break + + batch = [] + for row in rows: + cb = row[1] + if cb is None: + path, kwargs = "", {} + else: + if isinstance(cb, str): + cb = json.loads(cb) + cb_inner = cb.get("__data__", cb) if isinstance(cb, dict) else {} + path = cb_inner.get("path", "") + if len(path) > _CALLBACK_MAX_LEN: + print( + f"WARNING: callback path for deadline {row[0]} exceeds " + f"{_CALLBACK_MAX_LEN} chars and will be truncated." + ) + path = path[:_CALLBACK_MAX_LEN] + kwargs = cb_inner.get("kwargs", {}) + if not isinstance(kwargs, dict): + kwargs = {} + batch.append({"row_id": row[0], "old_callback": path, "old_kwargs": kwargs}) + + conn.execute( + sa.update(deadline_write) + .where(deadline_write.c.id == sa.bindparam("row_id")) + .values( + callback_old=sa.bindparam("old_callback"), + callback_kwargs=sa.bindparam("old_kwargs"), + ), + batch, + ) + + if len(rows) < batch_size: + break + + with op.batch_alter_table("deadline", schema=None) as batch_op: + batch_op.drop_column("callback") + batch_op.alter_column( + "callback_old", + new_column_name="callback", + existing_type=sa.String(500), + nullable=False, + ) diff --git a/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py b/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py new file mode 100644 index 0000000000000..60118ca074022 --- /dev/null +++ b/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py @@ -0,0 +1,241 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Regression tests for migration 0080 (808787349f22): +upgrade() and downgrade() must correctly migrate existing deadline rows +without raising NotNullViolation. +""" + +from __future__ import annotations + +import importlib.util +import json +import uuid +from pathlib import Path +from unittest import mock + +import sqlalchemy as sa +from alembic.migration import MigrationContext +from alembic.operations import Operations + +from tests_common.test_utils.paths import AIRFLOW_CORE_SOURCES_PATH + +# Migration filenames start with a digit so they cannot be imported via the +# normal import system; load the module by file path instead. +_MIGRATION_PATH = ( + Path(AIRFLOW_CORE_SOURCES_PATH) + / "airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py" +) +_spec = importlib.util.spec_from_file_location("migration_0080", _MIGRATION_PATH) +_migration = importlib.util.module_from_spec(_spec) # type: ignore[arg-type] +_spec.loader.exec_module(_migration) # type: ignore[union-attr] + +upgrade = _migration.upgrade +downgrade = _migration.downgrade +_ASYNC_CALLBACK_CLASSNAME = _migration._ASYNC_CALLBACK_CLASSNAME + +_PRE_0080_DDL = """ +CREATE TABLE deadline ( + id TEXT PRIMARY KEY, + dagrun_id INTEGER NOT NULL, + deadline_time TEXT NOT NULL, + callback TEXT NOT NULL, + callback_kwargs TEXT, + created_at TEXT, + last_updated_at TEXT +) +""" + +_POST_0080_DDL = """ +CREATE TABLE deadline ( + id TEXT PRIMARY KEY, + dagrun_id INTEGER NOT NULL, + deadline_time TEXT NOT NULL, + callback TEXT NOT NULL, + created_at TEXT, + last_updated_at TEXT +) +""" + + +def _make_engine_pre_0080(): + """Return an in-memory SQLite engine with the pre-0080 deadline schema.""" + engine = sa.create_engine("sqlite:///:memory:") + with engine.connect() as conn: + conn.execute(sa.text(_PRE_0080_DDL)) + conn.commit() + return engine + + +def _make_engine_post_0080(): + """Return an in-memory SQLite engine with the post-0080 deadline schema.""" + engine = sa.create_engine("sqlite:///:memory:") + with engine.connect() as conn: + conn.execute(sa.text(_POST_0080_DDL)) + conn.commit() + return engine + + +def _run_upgrade(engine): + # alembic.context is a proxy that is only populated when running through + # Alembic's full migration runner (alembic upgrade). When calling the + # migration function directly in a test we must mock it so that the + # is_offline_mode() guard does not raise AttributeError. + with engine.begin() as conn: + with Operations.context(MigrationContext.configure(conn)): + with mock.patch.object(_migration, "context") as mock_ctx: + mock_ctx.is_offline_mode.return_value = False + upgrade() + + +def _run_downgrade(engine): + with engine.begin() as conn: + with Operations.context(MigrationContext.configure(conn)): + with mock.patch.object(_migration, "context") as mock_ctx: + mock_ctx.is_offline_mode.return_value = False + downgrade() + + +def _read_deadline(engine): + with engine.connect() as conn: + return conn.execute(sa.text("SELECT * FROM deadline")).mappings().all() + + +class TestMigration0080Upgrade: + def test_upgrade_empty_table(self): + """Upgrade on an empty table must not raise.""" + engine = _make_engine_pre_0080() + _run_upgrade(engine) + rows = _read_deadline(engine) + assert rows == [] + + def test_upgrade_migrates_existing_row(self): + """Upgrade converts VARCHAR callback + JSON kwargs to the expected JSON envelope.""" + engine = _make_engine_pre_0080() + row_id = str(uuid.uuid4()) + with engine.begin() as conn: + conn.execute( + sa.text( + "INSERT INTO deadline (id, dagrun_id, deadline_time, callback, callback_kwargs)" + " VALUES (:id, 1, '2025-01-01', :cb, :kw)" + ), + {"id": row_id, "cb": "mymodule.my_callback", "kw": json.dumps({"key": "val"})}, + ) + + _run_upgrade(engine) + + rows = _read_deadline(engine) + assert len(rows) == 1 + cb = rows[0]["callback"] + if isinstance(cb, str): + cb = json.loads(cb) + assert cb["__classname__"] == _ASYNC_CALLBACK_CLASSNAME + assert cb["__version__"] == 0 + assert cb["__data__"]["path"] == "mymodule.my_callback" + assert cb["__data__"]["kwargs"] == {"key": "val"} + assert "callback_kwargs" not in rows[0] + + def test_upgrade_null_kwargs_defaults_to_empty_dict(self): + """Upgrade with NULL callback_kwargs must produce an empty dict in the envelope.""" + engine = _make_engine_pre_0080() + row_id = str(uuid.uuid4()) + with engine.begin() as conn: + conn.execute( + sa.text( + "INSERT INTO deadline (id, dagrun_id, deadline_time, callback, callback_kwargs)" + " VALUES (:id, 1, '2025-01-01', :cb, NULL)" + ), + {"id": row_id, "cb": "mymodule.my_callback"}, + ) + + _run_upgrade(engine) + + rows = _read_deadline(engine) + cb = rows[0]["callback"] + if isinstance(cb, str): + cb = json.loads(cb) + assert cb["__data__"]["kwargs"] == {} + + +class TestMigration0080Downgrade: + def test_downgrade_empty_table(self): + """Downgrade on an empty table must not raise.""" + engine = _make_engine_post_0080() + _run_downgrade(engine) + rows = _read_deadline(engine) + assert rows == [] + + def test_downgrade_restores_existing_row(self): + """Downgrade extracts path and kwargs back from the JSON envelope.""" + engine = _make_engine_post_0080() + row_id = str(uuid.uuid4()) + callback_json = json.dumps( + { + "__data__": {"path": "mymodule.my_callback", "kwargs": {"key": "val"}}, + "__classname__": _ASYNC_CALLBACK_CLASSNAME, + "__version__": 0, + } + ) + with engine.begin() as conn: + conn.execute( + sa.text( + "INSERT INTO deadline (id, dagrun_id, deadline_time, callback)" + " VALUES (:id, 1, '2025-01-01', :cb)" + ), + {"id": row_id, "cb": callback_json}, + ) + + _run_downgrade(engine) + + rows = _read_deadline(engine) + assert len(rows) == 1 + assert rows[0]["callback"] == "mymodule.my_callback" + kw = rows[0]["callback_kwargs"] + if isinstance(kw, str): + kw = json.loads(kw) + assert kw == {"key": "val"} + + +class TestMigration0080RoundTrip: + def test_round_trip_preserves_data(self): + """Upgrade followed by downgrade preserves the original callback path.""" + engine = _make_engine_pre_0080() + row_id = str(uuid.uuid4()) + original_path = "mymodule.my_callback" + original_kwargs = {"x": 1} + + with engine.begin() as conn: + conn.execute( + sa.text( + "INSERT INTO deadline (id, dagrun_id, deadline_time, callback, callback_kwargs)" + " VALUES (:id, 1, '2025-01-01', :cb, :kw)" + ), + {"id": row_id, "cb": original_path, "kw": json.dumps(original_kwargs)}, + ) + + _run_upgrade(engine) + _run_downgrade(engine) + + rows = _read_deadline(engine) + assert len(rows) == 1 + assert rows[0]["callback"] == original_path + kw = rows[0]["callback_kwargs"] + if isinstance(kw, str): + kw = json.loads(kw) + assert kw == original_kwargs From b03a23e787d5a55d5d11ea896d39d1a481b41db5 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 18 May 2026 15:33:18 +0530 Subject: [PATCH 2/6] fix(deadline): repair NULL callback rows in 0094 upgrade Legacy MySQL deployments that ran the original (pre-#66016) 0080 migration silently wrote NULL into deadline.callback. Alembic is already stamped past 808787349f22 on those deployments, so the fixed 0080 won't re-run -- 0094 then crashes on json.loads(None). Defensive handling in 0094: - _upgrade_mysql_sqlite: detect raw_cb is None, log warning, default to empty envelope. - _upgrade_postgresql: COALESCE on cb_path / cb_kwargs so NULL jsonb doesn't propagate into callback.data. Regression test starts from a post-0080 schema with callback=NULL and verifies 0094 produces an empty envelope without crashing, plus that a mixed NULL+valid batch migrates both rows correctly. Addresses ephraimbuddy's review comment on #66016. --- ...lace_deadline_inline_callback_with_fkey.py | 29 ++- .../test_0094_deadline_callback_migration.py | 175 ++++++++++++++++++ 2 files changed, 198 insertions(+), 6 deletions(-) create mode 100644 airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py diff --git a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py index 06d0f7b43ab8b..4324a501020b0 100644 --- a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py +++ b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py @@ -69,8 +69,10 @@ def _upgrade_postgresql(conn, batch_size): d.id AS deadline_id, gen_random_uuid() AS callback_id, COALESCE(dr.dag_id, '') AS dag_id, - d.callback::jsonb->'__data__'->>'path' AS cb_path, - d.callback::jsonb->'__data__'->'kwargs' AS cb_kwargs, + -- COALESCE on NULL callback (legacy 0080 bug, see PR #66016) + -- so we don't insert NULL into callback.data downstream. + COALESCE(d.callback::jsonb->'__data__'->>'path', '') AS cb_path, + COALESCE(d.callback::jsonb->'__data__'->'kwargs', '{}'::jsonb) AS cb_kwargs, CASE WHEN d.callback_state IN (:state_success, :state_failed) THEN d.callback_state ELSE :state_pending @@ -199,11 +201,26 @@ def _upgrade_mysql_sqlite(conn, batch_size): for row in batch: callback_id = uuid6.uuid7() - cb = row.callback if isinstance(row.callback, dict) else json.loads(row.callback) - cb_inner = cb.get("__data__", cb) + raw_cb = row.callback + if raw_cb is None: + # Defensive: legacy MySQL deployments that ran the original (buggy) + # 0080 may have NULL callback rows. Treat as empty envelope so 0094 + # doesn't crash on json.loads(None); see PR #66016. + print(f"WARNING: deadline {row.id} has NULL callback; defaulting to empty envelope.") + cb = {} + elif isinstance(raw_cb, dict): + cb = raw_cb + else: + cb = json.loads(raw_cb) + cb_inner = cb.get("__data__", cb) if isinstance(cb, dict) else {} + if not isinstance(cb_inner, dict): + cb_inner = {} + kwargs = cb_inner.get("kwargs", {}) + if not isinstance(kwargs, dict): + kwargs = {} cb_data = { - "path": cb_inner.get("path", ""), - "kwargs": cb_inner.get("kwargs", {}), + "path": cb_inner.get("path", "") or "", + "kwargs": kwargs, "prefix": _CALLBACK_METRICS_PREFIX, "dag_id": row.dag_id or "", } diff --git a/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py new file mode 100644 index 0000000000000..c831411f923d0 --- /dev/null +++ b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py @@ -0,0 +1,175 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Regression tests for migration 0094 (e812941398f4). + +These tests focus on the defensive NULL-callback path: legacy MySQL +deployments that ran the original (buggy) 0080 left ``deadline.callback`` +rows as NULL. 0094 must heal those rows instead of crashing on +``json.loads(None)``. See PR #66016. +""" + +from __future__ import annotations + +import importlib.util +import json +import uuid +from pathlib import Path + +import sqlalchemy as sa + +from tests_common.test_utils.paths import AIRFLOW_CORE_SOURCES_PATH + +_MIGRATION_PATH = ( + Path(AIRFLOW_CORE_SOURCES_PATH) + / "airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py" +) +_spec = importlib.util.spec_from_file_location("migration_0094", _MIGRATION_PATH) +_migration = importlib.util.module_from_spec(_spec) # type: ignore[arg-type] +_spec.loader.exec_module(_migration) # type: ignore[union-attr] + + +# Minimal post-0080 / pre-0094 schema. 0094 adds ``missed`` and ``callback_id`` +# itself before invoking ``_upgrade_mysql_sqlite``; we mimic that here so we +# can call the inner helper directly without driving the full alembic chain. +_POST_0080_DDL = [ + """ + CREATE TABLE dag_run ( + id INTEGER PRIMARY KEY, + dag_id TEXT NOT NULL + ) + """, + """ + CREATE TABLE deadline ( + id TEXT PRIMARY KEY, + dagrun_id INTEGER NOT NULL, + deadline_time TEXT NOT NULL, + callback TEXT, + callback_state TEXT, + trigger_id INTEGER, + callback_id TEXT, + missed BOOLEAN + ) + """, + """ + CREATE TABLE callback ( + id TEXT PRIMARY KEY, + type TEXT NOT NULL, + fetch_method TEXT NOT NULL, + data TEXT NOT NULL, + state TEXT NOT NULL, + priority_weight INTEGER NOT NULL, + created_at TEXT NOT NULL + ) + """, +] + + +def _make_engine(): + engine = sa.create_engine("sqlite:///:memory:") + with engine.connect() as conn: + for ddl in _POST_0080_DDL: + conn.execute(sa.text(ddl)) + conn.commit() + return engine + + +def _insert_dagrun(conn, dagrun_id: int = 1, dag_id: str = "test_dag"): + conn.execute( + sa.text("INSERT INTO dag_run (id, dag_id) VALUES (:id, :dag_id)"), + {"id": dagrun_id, "dag_id": dag_id}, + ) + + +def _insert_deadline(conn, deadline_id: str, callback, callback_state: str | None = None): + conn.execute( + sa.text( + "INSERT INTO deadline (id, dagrun_id, deadline_time, callback, callback_state)" + " VALUES (:id, 1, '2025-01-01', :cb, :state)" + ), + {"id": deadline_id, "cb": callback, "state": callback_state}, + ) + + +class TestMigration0094NullCallbackRepair: + """A NULL callback row from a buggy 0080 must not crash 0094's upgrade.""" + + def test_null_callback_does_not_crash(self): + engine = _make_engine() + deadline_id = str(uuid.uuid4()) + with engine.begin() as conn: + _insert_dagrun(conn) + _insert_deadline(conn, deadline_id, callback=None) + + # _upgrade_mysql_sqlite reads from `deadline` and writes to `callback`; + # it does not depend on alembic's batch_alter_table prelude. + with engine.begin() as conn: + _migration._upgrade_mysql_sqlite(conn, batch_size=10) + + with engine.connect() as conn: + deadline_rows = conn.execute(sa.text("SELECT * FROM deadline")).mappings().all() + callback_rows = conn.execute(sa.text("SELECT * FROM callback")).mappings().all() + + assert len(deadline_rows) == 1 + assert len(callback_rows) == 1 + assert deadline_rows[0]["callback_id"] == callback_rows[0]["id"] + assert deadline_rows[0]["missed"] == 0 # SQLite: False -> 0 + + cb_data = json.loads(callback_rows[0]["data"]) + assert cb_data["path"] == "" + assert cb_data["kwargs"] == {} + assert cb_data["dag_id"] == "test_dag" + + def test_mixed_null_and_valid_callbacks(self): + """A batch with both NULL and well-formed rows must migrate both.""" + engine = _make_engine() + null_id = str(uuid.uuid4()) + valid_id = str(uuid.uuid4()) + valid_callback = json.dumps( + { + "__data__": {"path": "mymodule.cb", "kwargs": {"k": "v"}}, + "__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", + "__version__": 0, + } + ) + with engine.begin() as conn: + _insert_dagrun(conn) + _insert_deadline(conn, null_id, callback=None) + _insert_deadline(conn, valid_id, callback=valid_callback) + + with engine.begin() as conn: + _migration._upgrade_mysql_sqlite(conn, batch_size=10) + + with engine.connect() as conn: + rows = ( + conn.execute( + sa.text( + "SELECT d.id AS deadline_id, c.data" + " FROM deadline d JOIN callback c ON d.callback_id = c.id" + ) + ) + .mappings() + .all() + ) + + by_id = {r["deadline_id"]: json.loads(r["data"]) for r in rows} + assert by_id[null_id]["path"] == "" + assert by_id[null_id]["kwargs"] == {} + assert by_id[valid_id]["path"] == "mymodule.cb" + assert by_id[valid_id]["kwargs"] == {"k": "v"} From 058d7e97d7efa44c564faacc8defee49e2aeed1c Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 18 May 2026 15:42:17 +0530 Subject: [PATCH 3/6] fix(deadline): address review nits - 0094 PG path: wrap kwargs COALESCE with NULLIF so JSON-literal null (e.g. from hand-edited DBs) is also normalized to {} -- matches the MySQL/SQLite defensive normalization. - 0094 MySQL/SQLite: aggregate per-row NULL-callback WARNING into a single summary line after the loop to avoid log spam on deployments with many legacy NULL rows. - 0080 downgrade: log a WARNING when an existing row's kwargs is not a dict (instead of silently resetting), mirroring the VARCHAR truncate warning pattern. - 0080 tests: add test_upgrade_exact_batch_boundary covering the case where rows == batch_size to exercise the loop-continuation path. --- ...0_3_1_0_modify_deadline_callback_schema.py | 4 +++ ...lace_deadline_inline_callback_with_fkey.py | 16 ++++++----- .../test_0080_deadline_callback_migration.py | 27 +++++++++++++++++++ .../test_0094_deadline_callback_migration.py | 2 +- 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py index 5eb85e0abb7f2..23b00e1a1eb64 100644 --- a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py +++ b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py @@ -211,6 +211,10 @@ def downgrade(): path = path[:_CALLBACK_MAX_LEN] kwargs = cb_inner.get("kwargs", {}) if not isinstance(kwargs, dict): + print( + f"WARNING: kwargs for deadline {row[0]} is not a dict " + f"(type={type(kwargs).__name__}); resetting to empty dict." + ) kwargs = {} batch.append({"row_id": row[0], "old_callback": path, "old_kwargs": kwargs}) diff --git a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py index 4324a501020b0..0cab3d57c252f 100644 --- a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py +++ b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py @@ -69,10 +69,8 @@ def _upgrade_postgresql(conn, batch_size): d.id AS deadline_id, gen_random_uuid() AS callback_id, COALESCE(dr.dag_id, '') AS dag_id, - -- COALESCE on NULL callback (legacy 0080 bug, see PR #66016) - -- so we don't insert NULL into callback.data downstream. COALESCE(d.callback::jsonb->'__data__'->>'path', '') AS cb_path, - COALESCE(d.callback::jsonb->'__data__'->'kwargs', '{}'::jsonb) AS cb_kwargs, + COALESCE(NULLIF(d.callback::jsonb->'__data__'->'kwargs', 'null'::jsonb), '{}'::jsonb) AS cb_kwargs, CASE WHEN d.callback_state IN (:state_success, :state_failed) THEN d.callback_state ELSE :state_pending @@ -179,6 +177,7 @@ def _upgrade_mysql_sqlite(conn, batch_size): ) batch_num = 0 + null_callback_count = 0 while True: batch_num += 1 batch = conn.execute( @@ -203,10 +202,7 @@ def _upgrade_mysql_sqlite(conn, batch_size): callback_id = uuid6.uuid7() raw_cb = row.callback if raw_cb is None: - # Defensive: legacy MySQL deployments that ran the original (buggy) - # 0080 may have NULL callback rows. Treat as empty envelope so 0094 - # doesn't crash on json.loads(None); see PR #66016. - print(f"WARNING: deadline {row.id} has NULL callback; defaulting to empty envelope.") + null_callback_count += 1 cb = {} elif isinstance(raw_cb, dict): cb = raw_cb @@ -254,6 +250,12 @@ def _upgrade_mysql_sqlite(conn, batch_size): ) print(f"Migrated {len(batch)} deadline records in batch {batch_num}") + if null_callback_count: + print( + f"WARNING: {null_callback_count} deadline rows had NULL callback " + "(legacy 0080 data); migrated with empty envelope." + ) + def upgrade(): """Replace Deadline table's inline callback fields with callback_id foreign key.""" diff --git a/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py b/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py index 60118ca074022..c777d8418b1c9 100644 --- a/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py +++ b/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py @@ -172,6 +172,33 @@ def test_upgrade_null_kwargs_defaults_to_empty_dict(self): cb = json.loads(cb) assert cb["__data__"]["kwargs"] == {} + def test_upgrade_exact_batch_boundary(self, monkeypatch): + """Rows == batch_size must force a second iteration that returns 0 rows and exits cleanly.""" + # Force a small batch_size so 2 inserted rows == batch_size exactly. + monkeypatch.setattr(_migration.conf, "getint", lambda *a, **kw: 2) + engine = _make_engine_pre_0080() + with engine.begin() as conn: + for i in range(2): + conn.execute( + sa.text( + "INSERT INTO deadline (id, dagrun_id, deadline_time, callback, callback_kwargs)" + " VALUES (:id, 1, '2025-01-01', :cb, :kw)" + ), + {"id": str(uuid.uuid4()), "cb": f"mod.cb_{i}", "kw": json.dumps({"i": i})}, + ) + + _run_upgrade(engine) + + rows = _read_deadline(engine) + assert len(rows) == 2 + paths = sorted( + (json.loads(r["callback"]) if isinstance(r["callback"], str) else r["callback"])["__data__"][ + "path" + ] + for r in rows + ) + assert paths == ["mod.cb_0", "mod.cb_1"] + class TestMigration0080Downgrade: def test_downgrade_empty_table(self): diff --git a/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py index c831411f923d0..0ec9b6150a576 100644 --- a/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py +++ b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py @@ -22,7 +22,7 @@ These tests focus on the defensive NULL-callback path: legacy MySQL deployments that ran the original (buggy) 0080 left ``deadline.callback`` rows as NULL. 0094 must heal those rows instead of crashing on -``json.loads(None)``. See PR #66016. +``json.loads(None)``. """ from __future__ import annotations From 844e04a76a6974bb6ff0b732c44347514ab92151 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 18 May 2026 17:00:53 +0530 Subject: [PATCH 4/6] test(deadline): insert deadline ids in hex form to avoid SA Uuid type mismatch `_upgrade_mysql_sqlite` declares the deadline `id` column as `sa.Uuid()`. On SQLite the SQLAlchemy write path serializes UUID objects as a hex string without dashes. The previous test inserted ids via `str(uuid.uuid4())` (dashed form), so the SELECT correctly parsed them back to UUID objects but the subsequent UPDATE's WHERE clause produced the hex form -- no rows matched, the WHERE callback_id IS NULL filter never narrowed, and the loop spun until the 60s execution timeout. Insert ids via `uuid.uuid4().hex` so read and write round-trip cleanly. --- .../migrations/test_0094_deadline_callback_migration.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py index 0ec9b6150a576..3d848bf843a7c 100644 --- a/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py +++ b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py @@ -112,7 +112,10 @@ class TestMigration0094NullCallbackRepair: def test_null_callback_does_not_crash(self): engine = _make_engine() - deadline_id = str(uuid.uuid4()) + # `_upgrade_mysql_sqlite` declares ``id`` as ``sa.Uuid()``; on SQLite the + # write path emits the hex (no-dash) form. Insert IDs in that same form so + # the UPDATE in the migration loop matches the row we created. + deadline_id = uuid.uuid4().hex with engine.begin() as conn: _insert_dagrun(conn) _insert_deadline(conn, deadline_id, callback=None) @@ -139,8 +142,8 @@ def test_null_callback_does_not_crash(self): def test_mixed_null_and_valid_callbacks(self): """A batch with both NULL and well-formed rows must migrate both.""" engine = _make_engine() - null_id = str(uuid.uuid4()) - valid_id = str(uuid.uuid4()) + null_id = uuid.uuid4().hex + valid_id = uuid.uuid4().hex valid_callback = json.dumps( { "__data__": {"path": "mymodule.cb", "kwargs": {"k": "v"}}, From e50f106fcfbefeab922fa3dec4c9d5cb592286c2 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 18 May 2026 19:58:46 +0530 Subject: [PATCH 5/6] fix(deadline): drop redundant empty-rows break in 0080 batch loops --- .../versions/0080_3_1_0_modify_deadline_callback_schema.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py index 23b00e1a1eb64..9ac39bfc27972 100644 --- a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py +++ b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py @@ -99,9 +99,6 @@ def upgrade(): .limit(batch_size) ).fetchall() - if not rows: - break - batch = [] for row in rows: path = row[1] or "" @@ -190,9 +187,6 @@ def downgrade(): .limit(batch_size) ).fetchall() - if not rows: - break - batch = [] for row in rows: cb = row[1] From c62c125b3aff978a9f9298d8b1827b7273f09722 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 18 May 2026 22:08:23 +0530 Subject: [PATCH 6/6] fix(deadline): restore empty-rows break in 0080 batch loops Without this guard, an empty deadline table causes executemany with an empty parameter list, which SQLAlchemy rejects with 'A value is required for bind parameter'. Reverts the removal from e50f106f. --- .../versions/0080_3_1_0_modify_deadline_callback_schema.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py index 9ac39bfc27972..23b00e1a1eb64 100644 --- a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py +++ b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py @@ -99,6 +99,9 @@ def upgrade(): .limit(batch_size) ).fetchall() + if not rows: + break + batch = [] for row in rows: path = row[1] or "" @@ -187,6 +190,9 @@ def downgrade(): .limit(batch_size) ).fetchall() + if not rows: + break + batch = [] for row in rows: cb = row[1]