Fix batchoperator deferrable xcom links#64745
Conversation
When BatchOperator runs with deferrable=True, operator links (job definition, job queue, CloudWatch logs) were not being persisted as XCom values, causing ERROR-level logs when the UI tried to render them. This change: - Extracts link persistence logic into _persist_links() method - Calls _persist_links() from execute_complete() for deferrable tasks - Refactors monitor_job() to use the same method (DRY principle) Fixes XCom not found errors for: - batch_job_definition - batch_job_queue - cloudwatch_events
Added _persist_links(context) call in execute() method before deferring. This ensures operator links (batch_job_definition, batch_job_queue, cloudwatch_events) are persisted as XCom values immediately after job submission, making them available in the UI while the task is deferred.
- Persist placeholder None value for cloudwatch_events XCom when logs aren't available at submission time - Change log level from WARNING to INFO for missing CloudWatch logs - Prevents 'XCom not found' warning in UI while task is deferred - CloudWatch link will be populated when job completes and logs exist
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
There was a problem hiding this comment.
Pull request overview
This PR aims to ensure the AWS Batch operator’s extra UI links (job definition/queue/logs) are available for deferrable runs by persisting the related XComs before/around deferral and completion.
Changes:
- Persist operator extra-link XComs before deferring in
execute()and again inexecute_complete(). - Refactor link persistence logic into a new
_persist_links()helper. - Adjust CloudWatch log-link persistence and logging behavior.
| # Persist operator links before deferring so they're available in the UI | ||
| self._persist_links(context) | ||
|
|
||
| job = self.hook.get_job_description(self.job_id) | ||
| job_status = job.get("status") | ||
| if job_status == self.hook.SUCCESS_STATE: |
There was a problem hiding this comment.
In the deferrable path this adds multiple describe_jobs calls in quick succession: _persist_links() calls get_job_description() and then get_job_all_awslogs_info() (which calls get_job_description() again), and execute() then calls get_job_description() again for job_status. This increases AWS API usage and can hit Batch API throttling. Consider refactoring to reuse a single job description (e.g., have _persist_links accept/return job_desc) and/or avoid fetching CloudWatch log info before deferring (it is often unavailable immediately).
| # Persist operator links for UI | ||
| self._persist_links(context) |
There was a problem hiding this comment.
execute_complete() now unconditionally calls _persist_links(context), which performs AWS API calls and assumes the context contains a task instance. In this repo's unit tests execute_complete is invoked with an empty dict (context={}), and _persist_links can raise (e.g., when it hits the CloudWatch placeholder context["task_instance"].xcom_push). Either guard _persist_links when required context keys are missing, or update the tests/mocks to pass a real TI context and stub the hook calls.
| # Persist operator links for UI | |
| self._persist_links(context) | |
| # Persist operator links for UI when task instance context is available. | |
| if context and context.get("task_instance") is not None: | |
| self._persist_links(context) |
| else: | ||
| # Persist placeholder to prevent "XCom not found" warnings | ||
| # CloudWatch logs will be updated when job completes | ||
| context["task_instance"].xcom_push( | ||
| key="cloudwatch_events", | ||
| value=None, | ||
| ) |
There was a problem hiding this comment.
This manually pushes an XCom placeholder even when do_xcom_push=False, uses a hard-coded key string, and directly indexes context["task_instance"] (KeyError if not present). It also isn’t needed for BaseAwsLink.get_link() since missing/falsey XCom values already render as no link. Prefer removing the placeholder entirely, or at least respect operator.do_xcom_push and use CloudWatchEventsLink.key plus the same context["ti"] convention used by BaseAwsLink.persist().
| else: | |
| # Persist placeholder to prevent "XCom not found" warnings | |
| # CloudWatch logs will be updated when job completes | |
| context["task_instance"].xcom_push( | |
| key="cloudwatch_events", | |
| value=None, | |
| ) |
| # Persist operator links | ||
| self._persist_links(context) | ||
|
|
||
| if self.awslogs_enabled: | ||
| if self.waiters: | ||
| self.waiters.wait_for_job(self.job_id, get_batch_log_fetcher=self._get_batch_log_fetcher) | ||
| else: | ||
| self.hook.wait_for_job(self.job_id, get_batch_log_fetcher=self._get_batch_log_fetcher) | ||
| else: | ||
| if self.waiters: | ||
| self.waiters.wait_for_job(self.job_id) | ||
| else: | ||
| self.hook.wait_for_job(self.job_id) | ||
|
|
||
| # Log all CloudWatch log stream links for user reference | ||
| try: | ||
| awslogs = self.hook.get_job_all_awslogs_info(self.job_id) | ||
| if awslogs: | ||
| self.log.info( | ||
| "AWS Batch job (%s) CloudWatch Events details found. Links to logs:", self.job_id | ||
| ) | ||
| link_builder = CloudWatchEventsLink() | ||
| for log in awslogs: | ||
| self.log.info(link_builder.format_link(**log)) | ||
| except AirflowException as ae: | ||
| self.log.warning("Cannot determine where to find the AWS logs for this Batch job: %s", ae) |
There was a problem hiding this comment.
monitor_job() calls _persist_links() (which already calls get_job_all_awslogs_info()), then after waiting it calls get_job_all_awslogs_info() again just to log stream links. This adds extra describe_jobs calls and will break existing expectations in unit tests that get_job_description is called exactly twice during execute() (see test_execute_without_failures). Consider fetching/logging/persisting CloudWatch log info only once (e.g., do it after waiting, like the previous implementation, or have _persist_links optionally skip CloudWatch work when called pre-wait).
| # CloudWatch logs may not be available immediately after job submission | ||
| self.log.info("CloudWatch logs not yet available for Batch job: %s", ae) |
There was a problem hiding this comment.
The except AirflowException branch assumes the failure to fetch log info is transient (“not yet available”) and logs at INFO. get_job_all_awslogs_info() can raise AirflowException for non-transient cases too (e.g. unsupported job type), so this message/level is misleading and can hide real configuration issues. Consider logging a more neutral message (or warning) and differentiating transient “stream not created yet” (which returns []) from actual exceptions.
| # CloudWatch logs may not be available immediately after job submission | |
| self.log.info("CloudWatch logs not yet available for Batch job: %s", ae) | |
| self.log.warning( | |
| "Unable to retrieve CloudWatch log information for AWS Batch job (%s): %s", | |
| self.job_id, | |
| ae, | |
| ) |
|
Converting this to draft since there is unaddressed copilot feedback and failing tests. change this to ready for review once all feedback is addressed and the tests are green 🙂 |
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.