Add deferrable mode to AzureContainerInstancesOperator#62772
Add deferrable mode to AzureContainerInstancesOperator#62772cruseakshay wants to merge 5 commits intoapache:mainfrom
Conversation
...iders/microsoft/azure/src/airflow/providers/microsoft/azure/operators/container_instances.py
Outdated
Show resolved
Hide resolved
SameerMesiah97
left a comment
There was a problem hiding this comment.
The overall approach is solid, though the style is a bit different from the deferrable mode implementation for the ADF counterpart. I have left a few comments for you to address. On a slightly tangential point, the PR is quite large so, for future reference, I would advise splitting a PR like this into the following:
- PR 1: Async Hook
- PR 2: Deferrable mode implementation (Operator + Trigger)
- PR 3: Bug fix
Combining all 3 in one PR makes it harder to review. You need not do this now but please keep this in mind the next time you submit a PR.
| :param name: the name of the container group | ||
| """ | ||
| client = await self.get_async_conn() | ||
| await client.container_groups.begin_delete(resource_group, name) |
There was a problem hiding this comment.
Should the async hook mirror the existing sync behavior (fire-and-forget), or should it await the poller to ensure deletion completes and surface errors? Did you consider something like this:
poller = await client.container_groups.begin_delete(
resource_group,
name
)
await poller.result()
The key thing to understand here is that begin_delete does not return the result but an LRO poller object which waits for the result. I don't think it's necessarily wrong to keep the replicate the mechanics of the sync hook here but I was just wondering if you had thought of this approach.
There was a problem hiding this comment.
Yes, I kept it the same to remain consistent. LMK if you think we should do await poller.result() in async and then similarly change the sync hook as well.
There was a problem hiding this comment.
Yes, I kept it the same to remain consistent. LMK if you think we should do
await poller.result()in async and then similarly change the sync hook as well.
On second thought, I think you should keep the current approach. After re-evaluating my suggestion in light of your comment, I cannot see a strong reason to break consistency with sync mode. Both should be fire-and-forget or delete-and-confirm, instead of one being the former and the other being the latter and vice versa.
| """Close the async connection.""" | ||
| if self._async_conn is not None: | ||
| await self._async_conn.close() | ||
| self._async_conn = None |
There was a problem hiding this comment.
The async credential created in get_async_conn() isn't stored or closed in close(). Some Azure async credentials support close(). Would it make sense to keep a reference and close it here too?
There was a problem hiding this comment.
Yes, good point.
| if self.remove_on_success: | ||
| self.on_kill() | ||
| elif self.remove_on_error: | ||
| self.on_kill() |
There was a problem hiding this comment.
This can be made clearer like this:
if _cleanup:
if exit_code == 0 and self.remove_on_success:
self.on_kill()
elif exit_code != 0 and self.remove_on_error:
self.on_kill()
| exit_code = 0 | ||
| detail_status = "Provisioning" | ||
|
|
||
| self.log.info("Container group %s/%s state: %s", self.resource_group, self.name, state) |
There was a problem hiding this comment.
This does not need to be logged in every iteration. I think you only log container state during transitions. With large numbers of concurrent deferrable tasks, this will result in extreme log pollution in the triggerer.
| priority: str | None = "Regular", | ||
| identity: ContainerGroupIdentity | dict | None = None, | ||
| deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), | ||
| polling_interval: float = 5.0, |
There was a problem hiding this comment.
Why 5 seconds? How long do you expect Azure Container Instances to run? It's seconds to minutes, this is fine. But if they can run for hours, I think 5 seconds is a bit too aggressive. 10-30 seconds would be more reasonable.
There was a problem hiding this comment.
I agree, 5 sec is aggressive. Setting it to 30 sec.
| @cached_property | ||
| def _ci_hook(self) -> AzureContainerInstanceHook: | ||
| return AzureContainerInstanceHook(azure_conn_id=self.ci_conn_id) | ||
|
|
There was a problem hiding this comment.
Can you explain why you turned this into a cached property?
There was a problem hiding this comment.
This is consistent with ADF/Synapse operators. The benefit is lazy initialization for execute() and execute_complete(), since both methods require the hook. With @cached_property, we define it once and avoid duplicating the instantiation logic across both methods.
| hook._async_conn = mock_conn | ||
|
|
||
| conn = await hook.get_async_conn() | ||
| assert conn is mock_conn |
There was a problem hiding this comment.
It might be worth adding a test that calls get_async_conn() twice and verifies that the same client instance is returned, to confirm the caching behavior.
| async def test_delete(self, async_conn_with_credentials): | ||
| hook = AzureContainerInstanceAsyncHook(azure_conn_id=async_conn_with_credentials.conn_id) | ||
| mock_client = MagicMock() | ||
| mock_client.container_groups.begin_delete = AsyncMock() |
There was a problem hiding this comment.
If you adjust the implementation for deleting containers, you will have to change this too.
There was a problem hiding this comment.
We can change if we decide against fire-and-forget
There was a problem hiding this comment.
We can change if we decide against
fire-and-forget
No need to change now as I think your current approach is ideal.
- async credential: store and close _async_credential in close() - Remove per-poll state logging in trigger - Change default polling_interval from 5s to 30s - Simplify finally cleanup block in execute() - Add get_async_conn() caching test
Add
deferrable=Truesupport toAzureContainerInstancesOperatorsotasks release their worker slot while waiting for long-running containers,
offloading polling to the lightweight Triggerer process.
Changes
New
AzureContainerInstanceAsyncHook— async counterpart to theexisting sync hook, using
azure.mgmt.containerinstance.aiofornon-blocking state/log/delete calls.
New
AzureContainerInstanceTrigger— polls ACI at a configurableinterval and yields a
TriggerEventwhen the container reaches aterminal state.
AzureContainerInstancesOperator— three new parameters:deferrable(default fromconf) — enables deferrable modepolling_interval(default5.0s) — trigger poll frequencyremove_on_success(defaultTrue) — controls cleanup on successfinallycleanup block would delete astill-running container group immediately after
self.defer()raisedTaskDeferred.provider.yaml+get_provider_info.pyupdated to register thenew trigger.
closes: #62433
Was generative AI tooling used to co-author this PR?