Skip to content

Commit

Permalink
Use base aws classes in Amazon DynamoDB Sensors (#36770)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taragolis committed Jan 14, 2024
1 parent 9787440 commit b241577
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 15 deletions.
30 changes: 15 additions & 15 deletions airflow/providers/amazon/aws/sensors/dynamodb.py
Expand Up @@ -16,17 +16,17 @@
# under the License.
from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING, Any, Iterable, Sequence

from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields

if TYPE_CHECKING:
from airflow.utils.context import Context


class DynamoDBValueSensor(BaseSensorOperator):
class DynamoDBValueSensor(AwsBaseSensor[DynamoDBHook]):
"""
Waits for an attribute value to be present for an item in a DynamoDB table.
Expand All @@ -41,11 +41,20 @@ class DynamoDBValueSensor(BaseSensorOperator):
:param attribute_value: DynamoDB attribute value
:param sort_key_name: (optional) DynamoDB sort key name
:param sort_key_value: (optional) DynamoDB sort key value
:param aws_conn_id: aws connection to use
:param region_name: aws region to use
:param aws_conn_id: The Airflow connection used for AWS credentials.
If this is ``None`` or empty then the default boto3 behaviour is used. If
running Airflow in a distributed manner and aws_conn_id is None or
empty, then default boto3 configuration would be used (and must be
maintained on each worker node).
:param region_name: AWS region_name. If not specified then the default boto3 behaviour is used.
:param verify: Whether or not to verify SSL certificates. See:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html
:param botocore_config: Configuration dictionary (key-values) for botocore client. See:
https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
"""

template_fields: Sequence[str] = (
aws_hook_class = DynamoDBHook
template_fields: Sequence[str] = aws_template_fields(
"table_name",
"partition_key_name",
"partition_key_value",
Expand All @@ -64,8 +73,6 @@ def __init__(
attribute_value: str | Iterable[str],
sort_key_name: str | None = None,
sort_key_value: str | None = None,
aws_conn_id: str | None = DynamoDBHook.default_conn_name,
region_name: str | None = None,
**kwargs: Any,
):
super().__init__(**kwargs)
Expand All @@ -76,8 +83,6 @@ def __init__(
self.attribute_value = attribute_value
self.sort_key_name = sort_key_name
self.sort_key_value = sort_key_value
self.aws_conn_id = aws_conn_id
self.region_name = region_name

def poke(self, context: Context) -> bool:
"""Test DynamoDB item for matching attribute value."""
Expand Down Expand Up @@ -108,8 +113,3 @@ def poke(self, context: Context) -> bool:
)
except KeyError:
return False

@cached_property
def hook(self) -> DynamoDBHook:
"""Create and return a DynamoDBHook."""
return DynamoDBHook(self.aws_conn_id, region_name=self.region_name)
4 changes: 4 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/dynamodb.rst
Expand Up @@ -30,6 +30,10 @@ Prerequisite Tasks

.. include:: ../_partials/prerequisite_tasks.rst

Generic Parameters
------------------

.. include:: ../_partials/generic_parameters.rst

Sensors
-------
Expand Down
39 changes: 39 additions & 0 deletions tests/providers/amazon/aws/sensors/test_dynamodb.py
Expand Up @@ -117,6 +117,45 @@ def setup_method(self):
sort_key_value=self.sk_value,
)

def test_init(self):
sensor = DynamoDBValueSensor(
task_id="dynamodb_value_sensor_init",
table_name=self.table_name,
partition_key_name=self.pk_name,
partition_key_value=self.pk_value,
attribute_name=self.attribute_name,
attribute_value=self.attribute_value,
sort_key_name=self.sk_name,
sort_key_value=self.sk_value,
# Generic hooks parameters
aws_conn_id="fake-conn-id",
region_name="cn-north-1",
verify=False,
botocore_config={"read_timeout": 42},
)
assert sensor.hook.client_type is None
assert sensor.hook.resource_type == "dynamodb"
assert sensor.hook.aws_conn_id == "fake-conn-id"
assert sensor.hook._region_name == "cn-north-1"
assert sensor.hook._verify is False
assert sensor.hook._config is not None
assert sensor.hook._config.read_timeout == 42

sensor = DynamoDBValueSensor(
task_id="dynamodb_value_sensor_init",
table_name=self.table_name,
partition_key_name=self.pk_name,
partition_key_value=self.pk_value,
attribute_name=self.attribute_name,
attribute_value=self.attribute_value,
sort_key_name=self.sk_name,
sort_key_value=self.sk_value,
)
assert sensor.hook.aws_conn_id == "aws_default"
assert sensor.hook._region_name is None
assert sensor.hook._verify is None
assert sensor.hook._config is None

@mock_dynamodb
def test_sensor_with_pk(self):
hook = DynamoDBHook(table_name=self.table_name, table_keys=[self.pk_name])
Expand Down

0 comments on commit b241577

Please sign in to comment.