Amazon: Add Athena Spark operator and sensor support#66576
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
|
I ran the relevant provider unit tests and Python lint/format checks locally. I used --no-verify for the local commit because the blacken-docs hook environment on this machine was resolving against Python 3.9 even after reinstalling prek, which appears to be a local tooling issue rather than a code issue in this PR. |
There was a problem hiding this comment.
I only managed to get through half of the diff but there were quite a few issues worth addressing. I have left some comments but I would urge you to review the entire diff to ensure that the code is clean and that everything is in line with project standards.
Edit: I have just gone through the rest of the diff. The tests are better than the actual implementation but they need some refinement too. They will naturally have to be adjusted if you implement my feedback in initial part of the diff.
| @@ -0,0 +1 @@ | |||
| Add Athena Spark operator and sensor support to the Amazon provider. | |||
There was a problem hiding this comment.
Provider changes do not require newsfragments.
| AthenaSparkSensor( | ||
| task_id="wait_for_spark_calculation", | ||
| calculation_execution_id="calc-exec-123", | ||
| ) |
There was a problem hiding this comment.
So there are a few issues with this file:
-
You are inlining DAG code using these new operators/sensors. Standard practice in the provider docs is usually to add tasks using the new operators/sensors to the example DAG(s) and reference them via
exampleincludeblocks instead. -
Where is the
Prerequisite Taskssection? -
The structure of the Operators/Sensors sections looks inconsistent with the existing Amazon provider docs. For example, if you look at
athena_sql.rst, there is a top-levelOperatorssection followed by use-case-oriented subsections such asExecute a SQL query, along with explanatory text describing when/how the operators should be used. I think the same structure should be followed here for both operators and sensors. -
The page is also missing some contextual guidance for users. For example:
- whether an Athena Spark session must already exist
- which AWS connection/authentication is expected
- when to use
AthenaSparkOperatorvsAthenaSparkSensor
| self.log.info("Stopping Query with executionId - %s", query_execution_id) | ||
| return self.get_conn().stop_query_execution(QueryExecutionId=query_execution_id) | ||
|
|
||
| # --- Athena Spark (Calculations) API --- |
| calculation_execution_id=calculation_execution_id, use_cache=use_cache | ||
| ) | ||
| try: | ||
| return response["CalculationExecution"]["Status"].get("StateChangeReason") |
There was a problem hiding this comment.
Why is it ["CalculationExecution"]["Status"]["StateChangeReason"] here but response["Status"]["State"] above?
| "COMPLETED", | ||
| "FAILED", | ||
| "CANCELED", | ||
| ) |
There was a problem hiding this comment.
Better to do this to avoid drift:
SPARK_TERMINAL_STATES = SPARK_SUCCESS_STATES + SPARK_FAILURE_STATES
Also, I would move these constants above the hook constructor near the existing hook constants.
| :param description: Optional description of the calculation. | ||
| :param calculation_configuration: Contains configuration information for the calculation. | ||
| :param client_request_token: Optional idempotency token. | ||
| :return: CalculationExecutionId |
There was a problem hiding this comment.
Shouldn't this be:
:return: str
| execution_info.get("OutputLocation") | ||
| or (execution_info.get("CalculationExecution") or {}).get("OutputLocation") | ||
| or (execution_info.get("ResultConfiguration") or {}).get("OutputLocation") | ||
| ) |
There was a problem hiding this comment.
It looks like you are compensating for inconsistent response structures coming from the hook. If you need to normalize, why not push it down to the hook itself?
| if state in AthenaHook.SPARK_TERMINAL_STATES: | ||
| return state | ||
|
|
||
| raise AirflowException( |
There was a problem hiding this comment.
We need to avoid introducing AirflowException in the provider layer. I would suggest using RuntimeError. Please check the rest of the diff and implement the same feedback.
| """Poll calculation status until a terminal state or timeout.""" | ||
| for attempt in range(1, self.max_polling_attempts + 1): | ||
| if attempt > 1: | ||
| time.sleep(self.poll_interval) |
There was a problem hiding this comment.
Instead of checking for the iteration count, maybe it would be cleaner to move time.sleep to the end of the loop?
|
|
||
| def execute(self, context: Context) -> dict[str, Any]: | ||
| """Submit the Spark calculation, poll until terminal state, then return metadata.""" | ||
| del context |
There was a problem hiding this comment.
Why is del context needed here?
| ) | ||
|
|
||
| if initial_state and initial_state in AthenaHook.SPARK_TERMINAL_STATES: | ||
| return self._handle_terminal_state(calculation_execution_id, initial_state) |
There was a problem hiding this comment.
Why is this needed when you have lines 128-129 to poll for and then handle the terminal state? I would remove it unless you there is a strong justification that I cannot see.
| f"Reason: {reason or 'No reason provided.'}" | ||
| ) | ||
|
|
||
| if state != "COMPLETED": |
There was a problem hiding this comment.
Why are you using a string here for state checks when you have a constant called SPARK_SUCCESS_STATES ? It would be better to do:
if state not in AthenaHook.SPARK_SUCCESS_STATES:
| self.log.warning( | ||
| "Failed to stop calculation %s: %s", | ||
| self._calculation_execution_id, | ||
| e, |
There was a problem hiding this comment.
This would better as it preserves debugging information:
except Exception:
self.log.warning(
"Failed to stop calculation %s",
self._calculation_execution_id,
exc_info=True,
)
| if state in hook.SPARK_FAILURE_STATES: | ||
| raise AirflowException(f"Calculation {self.calculation_execution_id} failed with state: {state}") | ||
|
|
||
| return state == "COMPLETED" |
There was a problem hiding this comment.
Use the constant AthenaHook.SPARK_SUCCESS_STATES.
|
|
||
| def poke(self, context: Context) -> bool: | ||
| """Check the current status of the Spark calculation.""" | ||
| del context |
| - Mock the boto3 client via AthenaHook.get_conn() so no real AWS calls are made. | ||
| - Cover success, failure (exceptions), and bad/edge-case input for each hook method. | ||
| - Use botocore.exceptions.ClientError for API failure scenarios. | ||
| """ |
There was a problem hiding this comment.
Why is this docstring here?
| calculation_execution_id=MOCK_DATA["calculation_execution_id"] | ||
| ) | ||
|
|
||
| assert state == "RUNNING" |
There was a problem hiding this comment.
What about when the API returns a None or empty state? Have you covered that?
| mock_conn.return_value.start_calculation_execution.assert_called_with(**expected_call_params) | ||
| assert result == MOCK_DATA["calculation_execution_id"] | ||
|
|
||
| @mock.patch.object(AthenaHook, "get_conn") |
There was a problem hiding this comment.
Since there is ambiguity regarding the shape of the response object received for get_calculation_info, shouldn't there also be a test validating that? Also, since there is an option to use caching, I would suggest that you add a test or set of tests covering that behaviour.
| def test_poke_running_returns_false(self, mock_check: mock.Mock, sensor: AthenaSparkSensor): | ||
| result = sensor.poke({}) | ||
| assert result is False | ||
| mock_check.assert_called_once_with(CALC_ID) |
There was a problem hiding this comment.
These tests should be fine once you move response normalization down to the hook. There is one small gap that would still remain: there does not appear to be coverage for the Unexpected terminal state branch in _handle_terminal_state().
o-nikolas
left a comment
There was a problem hiding this comment.
Great review @SameerMesiah97 I did not have much to add on top. @Andisha2004 please try to clean this one up.
| :param max_polling_attempts: Maximum number of polling attempts before timing out. | ||
| To limit total task time, use execution_timeout on the task as well. |
There was a problem hiding this comment.
The majority of the Amazon provider uses max_attempts, please use that throughout this PR.
Co-authored-by: Cursor <cursoragent@cursor.com>
|
@Andisha2004 — There are 20 unresolved review thread(s) on this PR from @SameerMesiah97, @o-nikolas, and you have engaged with each one (post-review commits and/or in-thread replies). Could you confirm whether you believe the feedback is fully addressed and the PR is ready for maintainer review confirmation? If yes, reply here (a short "yes / ready" is fine) and an Apache Airflow maintainer will pick the PR up from the review queue on the next sweep. If you are still working on a thread, please reply with what is outstanding so the threads stay unresolved on purpose. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
What this PR does
This PR adds Athena Spark calculation support to the Amazon provider.
Specifically, it:
AthenaHookwith Athena Spark calculation helpersAthenaSparkOperatorfor submitting and waiting on Athena Spark calculationsAthenaSparkSensorfor monitoring an existing calculation executionWhy this is needed
The Amazon provider already supports Athena query execution, but it does not currently expose first-class support for Athena Spark calculation APIs. This PR fills that gap by adding provider-native support for Athena Spark job submission and monitoring.
Implementation notes
AthenaHookwraps Athena Spark APIs including:start_calculation_executionget_calculation_executionstop_calculation_executionAthenaSparkOperatorsubmits a calculation, polls until a terminal state, and returns execution metadataAthenaSparkSensorpolls an existing calculation execution ID until completion or failureTests
I added or updated unit tests for:
providers/amazon/tests/unit/amazon/aws/hooks/test_athena.pyproviders/amazon/tests/unit/amazon/aws/operators/test_athena_spark.pyproviders/amazon/tests/unit/amazon/aws/sensors/test_athena_spark.pyBackward compatibility
This PR adds new functionality and does not change the existing Athena query operator or sensor behavior.
Was generative AI tooling used to co-author this PR?
Generated-by: OpenAI Codex following the guidelines