Skip to content

Commit

Permalink
Revert "add deferrable mode for AthenaOperator (#32110)" (#32172)
Browse files Browse the repository at this point in the history
This reverts commit 256438c.
  • Loading branch information
eladkal committed Jun 27, 2023
1 parent 256438c commit 3a85d4e
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 163 deletions.
20 changes: 1 addition & 19 deletions airflow/providers/amazon/aws/operators/athena.py
Expand Up @@ -20,10 +20,8 @@
from functools import cached_property
from typing import TYPE_CHECKING, Any, Sequence

from airflow import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.athena import AthenaHook
from airflow.providers.amazon.aws.triggers.athena import AthenaTrigger

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down Expand Up @@ -71,7 +69,6 @@ def __init__(
sleep_time: int = 30,
max_polling_attempts: int | None = None,
log_query: bool = True,
deferrable: bool = False,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
Expand All @@ -84,10 +81,9 @@ def __init__(
self.query_execution_context = query_execution_context or {}
self.result_configuration = result_configuration or {}
self.sleep_time = sleep_time
self.max_polling_attempts = max_polling_attempts or 999999
self.max_polling_attempts = max_polling_attempts
self.query_execution_id: str | None = None
self.log_query: bool = log_query
self.deferrable = deferrable

@cached_property
def hook(self) -> AthenaHook:
Expand All @@ -105,15 +101,6 @@ def execute(self, context: Context) -> str | None:
self.client_request_token,
self.workgroup,
)

if self.deferrable:
self.defer(
trigger=AthenaTrigger(
self.query_execution_id, self.sleep_time, self.max_polling_attempts, self.aws_conn_id
),
method_name="execute_complete",
)
# implicit else:
query_status = self.hook.poll_query_status(
self.query_execution_id,
max_polling_attempts=self.max_polling_attempts,
Expand All @@ -134,11 +121,6 @@ def execute(self, context: Context) -> str | None:

return self.query_execution_id

def execute_complete(self, context, event=None):
if event["status"] != "success":
raise AirflowException(f"Error while waiting for operation on cluster to complete: {event}")
return event["value"]

def on_kill(self) -> None:
"""Cancel the submitted athena query."""
if self.query_execution_id:
Expand Down
76 changes: 0 additions & 76 deletions airflow/providers/amazon/aws/triggers/athena.py

This file was deleted.

3 changes: 0 additions & 3 deletions airflow/providers/amazon/provider.yaml
Expand Up @@ -515,9 +515,6 @@ hooks:
- airflow.providers.amazon.aws.hooks.appflow

triggers:
- integration-name: Amazon Athena
python-modules:
- airflow.providers.amazon.aws.triggers.athena
- integration-name: AWS Batch
python-modules:
- airflow.providers.amazon.aws.triggers.batch
Expand Down
12 changes: 0 additions & 12 deletions tests/providers/amazon/aws/operators/test_athena.py
Expand Up @@ -20,11 +20,9 @@

import pytest

from airflow.exceptions import TaskDeferred
from airflow.models import DAG, DagRun, TaskInstance
from airflow.providers.amazon.aws.hooks.athena import AthenaHook
from airflow.providers.amazon.aws.operators.athena import AthenaOperator
from airflow.providers.amazon.aws.triggers.athena import AthenaTrigger
from airflow.utils import timezone
from airflow.utils.timezone import datetime

Expand Down Expand Up @@ -160,13 +158,3 @@ def test_return_value(self, mock_conn, mock_run_query, mock_check_query_status):
ti.dag_run = dag_run

assert self.athena.execute(ti.get_template_context()) == ATHENA_QUERY_ID

@mock.patch.object(AthenaHook, "run_query", return_value=ATHENA_QUERY_ID)
def test_is_deferred(self, mock_run_query):
self.athena.deferrable = True

with pytest.raises(TaskDeferred) as deferred:
self.athena.execute(None)

assert isinstance(deferred.value.trigger, AthenaTrigger)
assert deferred.value.trigger.query_execution_id == ATHENA_QUERY_ID
53 changes: 0 additions & 53 deletions tests/providers/amazon/aws/triggers/test_athena.py

This file was deleted.

0 comments on commit 3a85d4e

Please sign in to comment.