Skip to content

Add DmsModifyTaskOperator#67524

Open
AlejandroMorgante wants to merge 15 commits into
apache:mainfrom
AlejandroMorgante:add-dms-modify-task-operator
Open

Add DmsModifyTaskOperator#67524
AlejandroMorgante wants to merge 15 commits into
apache:mainfrom
AlejandroMorgante:add-dms-modify-task-operator

Conversation

@AlejandroMorgante
Copy link
Copy Markdown

@AlejandroMorgante AlejandroMorgante commented May 26, 2026

Adds DmsModifyTaskOperator — a focused operator that modifies an existing AWS DMS replication task by calling ModifyReplicationTask.

The operator requires the task to already be stopped. Stop/restart lifecycle is intentionally left to DmsStopTaskOperator/DmsStartTaskOperator so users compose the flow explicitly in the DAG.

Changes

  • DmsHook.modify_replication_task — thin wrapper around the boto3 API
  • DmsTaskState enum in the hook (alongside the existing DmsTaskWaiterStatus)
  • DmsTaskModifyCompleteTrigger — polls until the task exits modifying; boto3 waiters treat that state as a terminal failure so a custom trigger is needed
  • DmsModifyTaskOperator — calls modify, optionally waits for the task to exit modifying (wait_for_completion=True); deferrable mode available for the wait phase

Testing

12 unit tests + validated end-to-end against real AWS (task in ready state, table mappings updated and verified in AWS).


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (claude-sonnet-4-6)

Generated-by: Claude Code (claude-sonnet-4-6) following the guidelines

@boring-cyborg
Copy link
Copy Markdown

boring-cyborg Bot commented May 26, 2026

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example Dag that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@boring-cyborg boring-cyborg Bot added kind:documentation provider:amazon AWS/Amazon - related issues labels May 26, 2026
@AlejandroMorgante AlejandroMorgante force-pushed the add-dms-modify-task-operator branch from 2e8bdad to 28477fa Compare May 26, 2026 05:01
Adds a new operator that wraps the DMS `ModifyReplicationTask` API,
allowing users to update a task's table mappings, migration type, CDC
start position, and task settings from within a DAG.

The operator handles the full pre-modify lifecycle: if the task is
already stopped it modifies it immediately; if it is still running it
can optionally stop and wait (with deferrable support via a new
`DmsTaskStoppedTrigger`); and if a previous modify is still in
progress it waits for it to finish before issuing a new one.

A common use-case is narrowing table mappings for a targeted backfill
without having to delete and recreate the task.
@AlejandroMorgante AlejandroMorgante force-pushed the add-dms-modify-task-operator branch from 2949aba to 520cc26 Compare May 26, 2026 05:06
…andling

- Add stop_task_before, restart_task_after, start_replication_task_type params
- Handle 'starting' state before stopping (DMS rejects stop on starting tasks)
- Replace boto3 waiter for 'modifying' state with poll loop (waiter treats
  modifying as terminal failure)
- Fix system test: wire modify_task into DAG chain (was orphaned)
- Expand unit tests from 10 to 14 covering all state transitions
- Add newsfragment for provider changelog
…ntry

providers/ does not use towncrier newsfragments — the release manager
regenerates the changelog from git log. Remove the newsfragment and
add the feature entry directly to changelog.rst instead.
Cover the success path (trigger fires → _do_modify runs, restart called)
and the error path (trigger fires with failure → AirflowException raised).
In the previous implementation, execute_complete() called _do_modify()
which blocked a worker slot with time.sleep() while waiting for the
'modifying' state to clear before restarting the task. No other AWS
operator blocks inside execute_complete() — the convention is that
callback methods are always lightweight.

Changes:
- DmsHook.get_task_status_async(): async polling method for use in
  the triggerer process
- DmsTaskModifyCompleteTrigger: async polling trigger that fires when
  the task exits 'modifying' (boto3 waiters treat it as terminal
  failure so a custom trigger is needed); supports verify/botocore_config
  for consistency with other DMS triggers
- DmsModifyTaskOperator._do_modify(): when deferrable=True and
  restart_task_after=True, defers to DmsTaskModifyCompleteTrigger
  instead of blocking; non-deferrable path unchanged
- DmsModifyTaskOperator.execute_restart(): lightweight callback that
  starts the task after the trigger fires
- Move TERMINAL_STATES class attribute to top of class alongside
  STOPPED_STATES; fix redundant condition in _wait_for_status();
  clarify wait_for_completion docstring (only applies to modifying state)
- 19 unit tests covering all state transitions and deferral paths
Copy link
Copy Markdown
Contributor

@SameerMesiah97 SameerMesiah97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this operator is taking on too many responsibilities. Since DmsStartReplicationOperator and DmsStopReplicationOperator already exist, would it make more sense for this operator to focus purely on modification instead of internally orchestrating stop/restart flows as well?

Right now it is effectively acting as a small lifecycle orchestration engine (stop > wait > modify > restart > wait), which is also why the control flow has become fairly complex (execute_restart, multi-stage deferral, overlapping state groupings, etc).

It feels like users could instead compose the lifecycle explicitly in the DAG using the existing operators plus a dedicated modify operator.

I only managed to get through part of the diff but current flow looks convoluted because of the above reason. I think this concern should be addressed before a line by line review.

Comment thread providers/amazon/docs/changelog.rst Outdated
Comment thread providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py Outdated
Comment thread providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py Outdated
Comment thread providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py
Remove stop_task_before, restart_task_after, start_replication_task_type
and the multi-stage deferral chain. The operator now focuses purely on
calling ModifyReplicationTask. Stop/restart lifecycle is left to the
existing DmsStopTaskOperator and DmsStartTaskOperator so users compose
the flow explicitly in the DAG.

Also:
- Add DmsTaskState enum to DmsHook (alongside DmsTaskWaiterStatus)
- Rename STOPPED_STATES to MODIFIABLE_STATES using the new enum
- Replace AirflowException with RuntimeError/ValueError in provider code
- wait_for_completion now means: wait for task to exit modifying state
@AlejandroMorgante AlejandroMorgante changed the title Add DmsModifyTaskOperator Add DmsModifyTaskOperator May 28, 2026
@AlejandroMorgante
Copy link
Copy Markdown
Author

AlejandroMorgante commented May 28, 2026

@SameerMesiah97 you're right, and I appreciate the clear feedback. Simplified the operator significantly — removed stop_task_before, restart_task_after, start_replication_task_type, the multi-stage deferral chain, and _wait_for_status. It now just calls ModifyReplicationTask and optionally waits for the task to exit modifying. Stop/restart is the caller's responsibility via the existing operators.

Addressed your other inline comments in the same pass.

Also tested end-to-end against real AWS: spun up a dms.t3.micro replication instance, created a task (landed in ready state), ran the operator, and verified the table mappings were updated correctly in AWS. The polling loop handled the ~1 min modifying window without issues.

@AlejandroMorgante AlejandroMorgante changed the title Add DmsModifyTaskOperator Add DmsModifyTaskOperator May 28, 2026
@AlejandroMorgante AlejandroMorgante force-pushed the add-dms-modify-task-operator branch from 60102d4 to 0ee3742 Compare May 28, 2026 02:29
@potiuk
Copy link
Copy Markdown
Member

potiuk commented May 30, 2026

@SameerMesiah97 -> are you fine with the changes ? They seemed to simplify a lot

@eladkal eladkal requested review from ferruzzi and vincbeck May 30, 2026 15:40
Comment thread providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py
Comment thread providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py
Comment thread providers/amazon/src/airflow/providers/amazon/aws/triggers/dms.py Outdated
Comment thread providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py
Comment thread providers/amazon/tests/system/amazon/aws/example_dms.py
Comment thread providers/amazon/tests/unit/amazon/aws/operators/test_dms.py Outdated
Comment thread providers/amazon/tests/unit/amazon/aws/operators/test_dms.py Outdated
Comment thread providers/amazon/tests/unit/amazon/aws/operators/test_dms.py Outdated
Comment thread providers/amazon/tests/unit/amazon/aws/operators/test_dms.py Outdated
Comment thread providers/amazon/tests/unit/amazon/aws/operators/test_dms.py Outdated
Comment thread providers/amazon/tests/unit/amazon/aws/operators/test_dms.py
Comment thread providers/amazon/tests/unit/amazon/aws/operators/test_dms.py Outdated
Comment thread providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py
Comment thread providers/amazon/tests/unit/amazon/aws/operators/test_dms.py
Comment thread providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py
Comment thread providers/amazon/src/airflow/providers/amazon/aws/operators/dms.py
- Make execute_complete() return consistent dict via defer kwargs
- Add else branch to separate deferrable/sync wait paths
- Move DMS_MODIFIABLE_STATES to hook; use in both operator and trigger
- Validate post-modify state in trigger and sync path against MODIFIABLE_STATES
- Handle None (task not found) in DmsTaskModifyCompleteTrigger
- Fix error message to list all modifiable states (stopped, ready, failed)
- Reorder example DAG: stop_task -> await_task_stop -> modify_task
- Remove unnecessary test_init/test_init_defaults tests
- Move local imports to top of test file
- Add DmsTaskModifyCompleteTrigger tests; parametrize modifiable state test
Copy link
Copy Markdown
Contributor

@SameerMesiah97 SameerMesiah97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just left more comments. Also, you have added a new trigger but I don't see any trigger tests. Can you add them as well?

Overall, I think this needs some work before merge (especially the trigger).

@AlejandroMorgante
Copy link
Copy Markdown
Author

Added TestDmsTaskModifyCompleteTrigger in tests/unit/amazon/aws/triggers/test_dms.py with tests for serialization, success, task not found (None), timeout, unexpected post-modify state, and exception. Also addressed the other inline comments in the latest push.

Comment thread providers/amazon/src/airflow/providers/amazon/aws/triggers/dms.py
@SameerMesiah97
Copy link
Copy Markdown
Contributor

SameerMesiah97 commented May 30, 2026

@SameerMesiah97 -> are you fine with the changes ? They seemed to simplify a lot

I think @AlejandroMorgante has done well to streamline the approach but I believe this needs a bit more workj before it can be merged. I just completed a line by line review just now. Also, it would not hurt to get opinions from @vincbeck and/or @o-nikolas on this as they are likely to have more DMS expertise..

- Add explicit aws_conn_id param and pass it to super().__init__()
- Raise ValueError in _wait_until_not_modifying() if task disappears
- Add tests: waiter_max_attempts exceeded, task disappears during wait
- Add wait_for_completion=False coverage via parametrized modifiable states test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants