diff --git a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py index adb3354512dc0..23b00e1a1eb64 100644 --- a/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py +++ b/airflow-core/src/airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py @@ -27,8 +27,13 @@ from __future__ import annotations +import json +from textwrap import dedent + import sqlalchemy as sa -from alembic import op +from alembic import context, op + +from airflow.configuration import conf # revision identifiers, used by Alembic. revision = "808787349f22" @@ -38,17 +43,199 @@ airflow_version = "3.1.0" +_ASYNC_CALLBACK_CLASSNAME = "airflow.sdk.definitions.deadline.AsyncCallback" +# Maximum length of the callback VARCHAR column in the pre-0080 schema. +_CALLBACK_MAX_LEN = 500 + + def upgrade(): """Replace deadline table's string callback and JSON callback_kwargs with JSON callback.""" + if context.is_offline_mode(): + print( + dedent(""" + ------------ + -- WARNING: Unable to migrate the data in the deadline table + -- while in offline mode! All rows in the deadline table will + -- be deleted. + ------------ + """) + ) + op.execute("DELETE FROM deadline") + with op.batch_alter_table("deadline", schema=None) as batch_op: + batch_op.drop_column("callback") + batch_op.drop_column("callback_kwargs") + batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=False)) + return + + conn = op.get_bind() + batch_size = conf.getint("database", "migration_batch_size", fallback=1000) + + # Add the destination column alongside the existing ones so we can migrate + # in batches without loading the whole table into memory at once. + with op.batch_alter_table("deadline", schema=None) as batch_op: + batch_op.add_column(sa.Column("callback_new", sa.JSON(), nullable=True)) + + deadline_read = sa.table( + "deadline", + sa.column("id"), + sa.column("callback"), + sa.column("callback_kwargs", sa.JSON()), + sa.column("callback_new", sa.JSON()), + ) + deadline_write = sa.table( + "deadline", + sa.column("id"), + sa.column("callback_new", sa.JSON()), + ) + + while True: + rows = conn.execute( + sa.select( + deadline_read.c.id, + deadline_read.c.callback, + deadline_read.c.callback_kwargs, + ) + .where(deadline_read.c.callback_new.is_(None)) + .limit(batch_size) + ).fetchall() + + if not rows: + break + + batch = [] + for row in rows: + path = row[1] or "" + kwargs = row[2] + if isinstance(kwargs, str): + kwargs = json.loads(kwargs) if kwargs else {} + if not isinstance(kwargs, dict): + kwargs = {} + batch.append( + { + "row_id": row[0], + "new_callback": { + "__data__": {"path": path, "kwargs": kwargs}, + "__classname__": _ASYNC_CALLBACK_CLASSNAME, + "__version__": 0, + }, + } + ) + + conn.execute( + sa.update(deadline_write) + .where(deadline_write.c.id == sa.bindparam("row_id")) + .values(callback_new=sa.bindparam("new_callback")), + batch, + ) + + if len(rows) < batch_size: + break + with op.batch_alter_table("deadline", schema=None) as batch_op: batch_op.drop_column("callback") batch_op.drop_column("callback_kwargs") - batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=False)) + batch_op.alter_column( + "callback_new", + new_column_name="callback", + existing_type=sa.JSON(), + nullable=False, + ) def downgrade(): """Replace deadline table's JSON callback with string callback and JSON callback_kwargs.""" + if context.is_offline_mode(): + print( + dedent(""" + ------------ + -- WARNING: Unable to migrate the data in the deadline table + -- while in offline mode! All rows in the deadline table will + -- be deleted. + ------------ + """) + ) + op.execute("DELETE FROM deadline") + with op.batch_alter_table("deadline", schema=None) as batch_op: + batch_op.drop_column("callback") + batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(), nullable=True)) + batch_op.add_column(sa.Column("callback", sa.String(length=500), nullable=False)) + return + + conn = op.get_bind() + batch_size = conf.getint("database", "migration_batch_size", fallback=1000) + + # Add the restored columns alongside the existing JSON callback so we can + # back-fill in batches before dropping the JSON column. with op.batch_alter_table("deadline", schema=None) as batch_op: - batch_op.drop_column("callback") + batch_op.add_column(sa.Column("callback_old", sa.String(length=500), nullable=True)) batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(), nullable=True)) - batch_op.add_column(sa.Column("callback", sa.String(length=500), nullable=False)) + + deadline_read = sa.table( + "deadline", + sa.column("id"), + sa.column("callback", sa.JSON()), + sa.column("callback_old", sa.String(500)), + ) + deadline_write = sa.table( + "deadline", + sa.column("id"), + sa.column("callback_old", sa.String(500)), + sa.column("callback_kwargs", sa.JSON()), + ) + + while True: + rows = conn.execute( + sa.select(deadline_read.c.id, deadline_read.c.callback) + .where(deadline_read.c.callback_old.is_(None)) + .limit(batch_size) + ).fetchall() + + if not rows: + break + + batch = [] + for row in rows: + cb = row[1] + if cb is None: + path, kwargs = "", {} + else: + if isinstance(cb, str): + cb = json.loads(cb) + cb_inner = cb.get("__data__", cb) if isinstance(cb, dict) else {} + path = cb_inner.get("path", "") + if len(path) > _CALLBACK_MAX_LEN: + print( + f"WARNING: callback path for deadline {row[0]} exceeds " + f"{_CALLBACK_MAX_LEN} chars and will be truncated." + ) + path = path[:_CALLBACK_MAX_LEN] + kwargs = cb_inner.get("kwargs", {}) + if not isinstance(kwargs, dict): + print( + f"WARNING: kwargs for deadline {row[0]} is not a dict " + f"(type={type(kwargs).__name__}); resetting to empty dict." + ) + kwargs = {} + batch.append({"row_id": row[0], "old_callback": path, "old_kwargs": kwargs}) + + conn.execute( + sa.update(deadline_write) + .where(deadline_write.c.id == sa.bindparam("row_id")) + .values( + callback_old=sa.bindparam("old_callback"), + callback_kwargs=sa.bindparam("old_kwargs"), + ), + batch, + ) + + if len(rows) < batch_size: + break + + with op.batch_alter_table("deadline", schema=None) as batch_op: + batch_op.drop_column("callback") + batch_op.alter_column( + "callback_old", + new_column_name="callback", + existing_type=sa.String(500), + nullable=False, + ) diff --git a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py index 06d0f7b43ab8b..0cab3d57c252f 100644 --- a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py +++ b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py @@ -69,8 +69,8 @@ def _upgrade_postgresql(conn, batch_size): d.id AS deadline_id, gen_random_uuid() AS callback_id, COALESCE(dr.dag_id, '') AS dag_id, - d.callback::jsonb->'__data__'->>'path' AS cb_path, - d.callback::jsonb->'__data__'->'kwargs' AS cb_kwargs, + COALESCE(d.callback::jsonb->'__data__'->>'path', '') AS cb_path, + COALESCE(NULLIF(d.callback::jsonb->'__data__'->'kwargs', 'null'::jsonb), '{}'::jsonb) AS cb_kwargs, CASE WHEN d.callback_state IN (:state_success, :state_failed) THEN d.callback_state ELSE :state_pending @@ -177,6 +177,7 @@ def _upgrade_mysql_sqlite(conn, batch_size): ) batch_num = 0 + null_callback_count = 0 while True: batch_num += 1 batch = conn.execute( @@ -199,11 +200,23 @@ def _upgrade_mysql_sqlite(conn, batch_size): for row in batch: callback_id = uuid6.uuid7() - cb = row.callback if isinstance(row.callback, dict) else json.loads(row.callback) - cb_inner = cb.get("__data__", cb) + raw_cb = row.callback + if raw_cb is None: + null_callback_count += 1 + cb = {} + elif isinstance(raw_cb, dict): + cb = raw_cb + else: + cb = json.loads(raw_cb) + cb_inner = cb.get("__data__", cb) if isinstance(cb, dict) else {} + if not isinstance(cb_inner, dict): + cb_inner = {} + kwargs = cb_inner.get("kwargs", {}) + if not isinstance(kwargs, dict): + kwargs = {} cb_data = { - "path": cb_inner.get("path", ""), - "kwargs": cb_inner.get("kwargs", {}), + "path": cb_inner.get("path", "") or "", + "kwargs": kwargs, "prefix": _CALLBACK_METRICS_PREFIX, "dag_id": row.dag_id or "", } @@ -237,6 +250,12 @@ def _upgrade_mysql_sqlite(conn, batch_size): ) print(f"Migrated {len(batch)} deadline records in batch {batch_num}") + if null_callback_count: + print( + f"WARNING: {null_callback_count} deadline rows had NULL callback " + "(legacy 0080 data); migrated with empty envelope." + ) + def upgrade(): """Replace Deadline table's inline callback fields with callback_id foreign key.""" diff --git a/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py b/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py new file mode 100644 index 0000000000000..c777d8418b1c9 --- /dev/null +++ b/airflow-core/tests/unit/migrations/test_0080_deadline_callback_migration.py @@ -0,0 +1,268 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Regression tests for migration 0080 (808787349f22): +upgrade() and downgrade() must correctly migrate existing deadline rows +without raising NotNullViolation. +""" + +from __future__ import annotations + +import importlib.util +import json +import uuid +from pathlib import Path +from unittest import mock + +import sqlalchemy as sa +from alembic.migration import MigrationContext +from alembic.operations import Operations + +from tests_common.test_utils.paths import AIRFLOW_CORE_SOURCES_PATH + +# Migration filenames start with a digit so they cannot be imported via the +# normal import system; load the module by file path instead. +_MIGRATION_PATH = ( + Path(AIRFLOW_CORE_SOURCES_PATH) + / "airflow/migrations/versions/0080_3_1_0_modify_deadline_callback_schema.py" +) +_spec = importlib.util.spec_from_file_location("migration_0080", _MIGRATION_PATH) +_migration = importlib.util.module_from_spec(_spec) # type: ignore[arg-type] +_spec.loader.exec_module(_migration) # type: ignore[union-attr] + +upgrade = _migration.upgrade +downgrade = _migration.downgrade +_ASYNC_CALLBACK_CLASSNAME = _migration._ASYNC_CALLBACK_CLASSNAME + +_PRE_0080_DDL = """ +CREATE TABLE deadline ( + id TEXT PRIMARY KEY, + dagrun_id INTEGER NOT NULL, + deadline_time TEXT NOT NULL, + callback TEXT NOT NULL, + callback_kwargs TEXT, + created_at TEXT, + last_updated_at TEXT +) +""" + +_POST_0080_DDL = """ +CREATE TABLE deadline ( + id TEXT PRIMARY KEY, + dagrun_id INTEGER NOT NULL, + deadline_time TEXT NOT NULL, + callback TEXT NOT NULL, + created_at TEXT, + last_updated_at TEXT +) +""" + + +def _make_engine_pre_0080(): + """Return an in-memory SQLite engine with the pre-0080 deadline schema.""" + engine = sa.create_engine("sqlite:///:memory:") + with engine.connect() as conn: + conn.execute(sa.text(_PRE_0080_DDL)) + conn.commit() + return engine + + +def _make_engine_post_0080(): + """Return an in-memory SQLite engine with the post-0080 deadline schema.""" + engine = sa.create_engine("sqlite:///:memory:") + with engine.connect() as conn: + conn.execute(sa.text(_POST_0080_DDL)) + conn.commit() + return engine + + +def _run_upgrade(engine): + # alembic.context is a proxy that is only populated when running through + # Alembic's full migration runner (alembic upgrade). When calling the + # migration function directly in a test we must mock it so that the + # is_offline_mode() guard does not raise AttributeError. + with engine.begin() as conn: + with Operations.context(MigrationContext.configure(conn)): + with mock.patch.object(_migration, "context") as mock_ctx: + mock_ctx.is_offline_mode.return_value = False + upgrade() + + +def _run_downgrade(engine): + with engine.begin() as conn: + with Operations.context(MigrationContext.configure(conn)): + with mock.patch.object(_migration, "context") as mock_ctx: + mock_ctx.is_offline_mode.return_value = False + downgrade() + + +def _read_deadline(engine): + with engine.connect() as conn: + return conn.execute(sa.text("SELECT * FROM deadline")).mappings().all() + + +class TestMigration0080Upgrade: + def test_upgrade_empty_table(self): + """Upgrade on an empty table must not raise.""" + engine = _make_engine_pre_0080() + _run_upgrade(engine) + rows = _read_deadline(engine) + assert rows == [] + + def test_upgrade_migrates_existing_row(self): + """Upgrade converts VARCHAR callback + JSON kwargs to the expected JSON envelope.""" + engine = _make_engine_pre_0080() + row_id = str(uuid.uuid4()) + with engine.begin() as conn: + conn.execute( + sa.text( + "INSERT INTO deadline (id, dagrun_id, deadline_time, callback, callback_kwargs)" + " VALUES (:id, 1, '2025-01-01', :cb, :kw)" + ), + {"id": row_id, "cb": "mymodule.my_callback", "kw": json.dumps({"key": "val"})}, + ) + + _run_upgrade(engine) + + rows = _read_deadline(engine) + assert len(rows) == 1 + cb = rows[0]["callback"] + if isinstance(cb, str): + cb = json.loads(cb) + assert cb["__classname__"] == _ASYNC_CALLBACK_CLASSNAME + assert cb["__version__"] == 0 + assert cb["__data__"]["path"] == "mymodule.my_callback" + assert cb["__data__"]["kwargs"] == {"key": "val"} + assert "callback_kwargs" not in rows[0] + + def test_upgrade_null_kwargs_defaults_to_empty_dict(self): + """Upgrade with NULL callback_kwargs must produce an empty dict in the envelope.""" + engine = _make_engine_pre_0080() + row_id = str(uuid.uuid4()) + with engine.begin() as conn: + conn.execute( + sa.text( + "INSERT INTO deadline (id, dagrun_id, deadline_time, callback, callback_kwargs)" + " VALUES (:id, 1, '2025-01-01', :cb, NULL)" + ), + {"id": row_id, "cb": "mymodule.my_callback"}, + ) + + _run_upgrade(engine) + + rows = _read_deadline(engine) + cb = rows[0]["callback"] + if isinstance(cb, str): + cb = json.loads(cb) + assert cb["__data__"]["kwargs"] == {} + + def test_upgrade_exact_batch_boundary(self, monkeypatch): + """Rows == batch_size must force a second iteration that returns 0 rows and exits cleanly.""" + # Force a small batch_size so 2 inserted rows == batch_size exactly. + monkeypatch.setattr(_migration.conf, "getint", lambda *a, **kw: 2) + engine = _make_engine_pre_0080() + with engine.begin() as conn: + for i in range(2): + conn.execute( + sa.text( + "INSERT INTO deadline (id, dagrun_id, deadline_time, callback, callback_kwargs)" + " VALUES (:id, 1, '2025-01-01', :cb, :kw)" + ), + {"id": str(uuid.uuid4()), "cb": f"mod.cb_{i}", "kw": json.dumps({"i": i})}, + ) + + _run_upgrade(engine) + + rows = _read_deadline(engine) + assert len(rows) == 2 + paths = sorted( + (json.loads(r["callback"]) if isinstance(r["callback"], str) else r["callback"])["__data__"][ + "path" + ] + for r in rows + ) + assert paths == ["mod.cb_0", "mod.cb_1"] + + +class TestMigration0080Downgrade: + def test_downgrade_empty_table(self): + """Downgrade on an empty table must not raise.""" + engine = _make_engine_post_0080() + _run_downgrade(engine) + rows = _read_deadline(engine) + assert rows == [] + + def test_downgrade_restores_existing_row(self): + """Downgrade extracts path and kwargs back from the JSON envelope.""" + engine = _make_engine_post_0080() + row_id = str(uuid.uuid4()) + callback_json = json.dumps( + { + "__data__": {"path": "mymodule.my_callback", "kwargs": {"key": "val"}}, + "__classname__": _ASYNC_CALLBACK_CLASSNAME, + "__version__": 0, + } + ) + with engine.begin() as conn: + conn.execute( + sa.text( + "INSERT INTO deadline (id, dagrun_id, deadline_time, callback)" + " VALUES (:id, 1, '2025-01-01', :cb)" + ), + {"id": row_id, "cb": callback_json}, + ) + + _run_downgrade(engine) + + rows = _read_deadline(engine) + assert len(rows) == 1 + assert rows[0]["callback"] == "mymodule.my_callback" + kw = rows[0]["callback_kwargs"] + if isinstance(kw, str): + kw = json.loads(kw) + assert kw == {"key": "val"} + + +class TestMigration0080RoundTrip: + def test_round_trip_preserves_data(self): + """Upgrade followed by downgrade preserves the original callback path.""" + engine = _make_engine_pre_0080() + row_id = str(uuid.uuid4()) + original_path = "mymodule.my_callback" + original_kwargs = {"x": 1} + + with engine.begin() as conn: + conn.execute( + sa.text( + "INSERT INTO deadline (id, dagrun_id, deadline_time, callback, callback_kwargs)" + " VALUES (:id, 1, '2025-01-01', :cb, :kw)" + ), + {"id": row_id, "cb": original_path, "kw": json.dumps(original_kwargs)}, + ) + + _run_upgrade(engine) + _run_downgrade(engine) + + rows = _read_deadline(engine) + assert len(rows) == 1 + assert rows[0]["callback"] == original_path + kw = rows[0]["callback_kwargs"] + if isinstance(kw, str): + kw = json.loads(kw) + assert kw == original_kwargs diff --git a/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py new file mode 100644 index 0000000000000..3d848bf843a7c --- /dev/null +++ b/airflow-core/tests/unit/migrations/test_0094_deadline_callback_migration.py @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Regression tests for migration 0094 (e812941398f4). + +These tests focus on the defensive NULL-callback path: legacy MySQL +deployments that ran the original (buggy) 0080 left ``deadline.callback`` +rows as NULL. 0094 must heal those rows instead of crashing on +``json.loads(None)``. +""" + +from __future__ import annotations + +import importlib.util +import json +import uuid +from pathlib import Path + +import sqlalchemy as sa + +from tests_common.test_utils.paths import AIRFLOW_CORE_SOURCES_PATH + +_MIGRATION_PATH = ( + Path(AIRFLOW_CORE_SOURCES_PATH) + / "airflow/migrations/versions/0094_3_2_0_replace_deadline_inline_callback_with_fkey.py" +) +_spec = importlib.util.spec_from_file_location("migration_0094", _MIGRATION_PATH) +_migration = importlib.util.module_from_spec(_spec) # type: ignore[arg-type] +_spec.loader.exec_module(_migration) # type: ignore[union-attr] + + +# Minimal post-0080 / pre-0094 schema. 0094 adds ``missed`` and ``callback_id`` +# itself before invoking ``_upgrade_mysql_sqlite``; we mimic that here so we +# can call the inner helper directly without driving the full alembic chain. +_POST_0080_DDL = [ + """ + CREATE TABLE dag_run ( + id INTEGER PRIMARY KEY, + dag_id TEXT NOT NULL + ) + """, + """ + CREATE TABLE deadline ( + id TEXT PRIMARY KEY, + dagrun_id INTEGER NOT NULL, + deadline_time TEXT NOT NULL, + callback TEXT, + callback_state TEXT, + trigger_id INTEGER, + callback_id TEXT, + missed BOOLEAN + ) + """, + """ + CREATE TABLE callback ( + id TEXT PRIMARY KEY, + type TEXT NOT NULL, + fetch_method TEXT NOT NULL, + data TEXT NOT NULL, + state TEXT NOT NULL, + priority_weight INTEGER NOT NULL, + created_at TEXT NOT NULL + ) + """, +] + + +def _make_engine(): + engine = sa.create_engine("sqlite:///:memory:") + with engine.connect() as conn: + for ddl in _POST_0080_DDL: + conn.execute(sa.text(ddl)) + conn.commit() + return engine + + +def _insert_dagrun(conn, dagrun_id: int = 1, dag_id: str = "test_dag"): + conn.execute( + sa.text("INSERT INTO dag_run (id, dag_id) VALUES (:id, :dag_id)"), + {"id": dagrun_id, "dag_id": dag_id}, + ) + + +def _insert_deadline(conn, deadline_id: str, callback, callback_state: str | None = None): + conn.execute( + sa.text( + "INSERT INTO deadline (id, dagrun_id, deadline_time, callback, callback_state)" + " VALUES (:id, 1, '2025-01-01', :cb, :state)" + ), + {"id": deadline_id, "cb": callback, "state": callback_state}, + ) + + +class TestMigration0094NullCallbackRepair: + """A NULL callback row from a buggy 0080 must not crash 0094's upgrade.""" + + def test_null_callback_does_not_crash(self): + engine = _make_engine() + # `_upgrade_mysql_sqlite` declares ``id`` as ``sa.Uuid()``; on SQLite the + # write path emits the hex (no-dash) form. Insert IDs in that same form so + # the UPDATE in the migration loop matches the row we created. + deadline_id = uuid.uuid4().hex + with engine.begin() as conn: + _insert_dagrun(conn) + _insert_deadline(conn, deadline_id, callback=None) + + # _upgrade_mysql_sqlite reads from `deadline` and writes to `callback`; + # it does not depend on alembic's batch_alter_table prelude. + with engine.begin() as conn: + _migration._upgrade_mysql_sqlite(conn, batch_size=10) + + with engine.connect() as conn: + deadline_rows = conn.execute(sa.text("SELECT * FROM deadline")).mappings().all() + callback_rows = conn.execute(sa.text("SELECT * FROM callback")).mappings().all() + + assert len(deadline_rows) == 1 + assert len(callback_rows) == 1 + assert deadline_rows[0]["callback_id"] == callback_rows[0]["id"] + assert deadline_rows[0]["missed"] == 0 # SQLite: False -> 0 + + cb_data = json.loads(callback_rows[0]["data"]) + assert cb_data["path"] == "" + assert cb_data["kwargs"] == {} + assert cb_data["dag_id"] == "test_dag" + + def test_mixed_null_and_valid_callbacks(self): + """A batch with both NULL and well-formed rows must migrate both.""" + engine = _make_engine() + null_id = uuid.uuid4().hex + valid_id = uuid.uuid4().hex + valid_callback = json.dumps( + { + "__data__": {"path": "mymodule.cb", "kwargs": {"k": "v"}}, + "__classname__": "airflow.sdk.definitions.deadline.AsyncCallback", + "__version__": 0, + } + ) + with engine.begin() as conn: + _insert_dagrun(conn) + _insert_deadline(conn, null_id, callback=None) + _insert_deadline(conn, valid_id, callback=valid_callback) + + with engine.begin() as conn: + _migration._upgrade_mysql_sqlite(conn, batch_size=10) + + with engine.connect() as conn: + rows = ( + conn.execute( + sa.text( + "SELECT d.id AS deadline_id, c.data" + " FROM deadline d JOIN callback c ON d.callback_id = c.id" + ) + ) + .mappings() + .all() + ) + + by_id = {r["deadline_id"]: json.loads(r["data"]) for r in rows} + assert by_id[null_id]["path"] == "" + assert by_id[null_id]["kwargs"] == {} + assert by_id[valid_id]["path"] == "mymodule.cb" + assert by_id[valid_id]["kwargs"] == {"k": "v"}