diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py index 8691cb57af13..f717b71efc82 100644 --- a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py +++ b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py @@ -20,7 +20,7 @@ from airflow.decorators import task from airflow.models.dag import DAG from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator +from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-airflow-12345') diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py b/airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py index 225bfcb45279..9ded09ba70e3 100644 --- a/airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py +++ b/airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py @@ -18,8 +18,9 @@ from datetime import datetime from airflow.models.dag import DAG -from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator -from airflow.providers.amazon.aws.operators.s3_bucket_tagging import ( +from airflow.providers.amazon.aws.operators.s3 import ( + S3CreateBucketOperator, + S3DeleteBucketOperator, S3DeleteBucketTaggingOperator, S3GetBucketTaggingOperator, S3PutBucketTaggingOperator, diff --git a/airflow/providers/amazon/aws/example_dags/example_salesforce_to_s3.py b/airflow/providers/amazon/aws/example_dags/example_salesforce_to_s3.py index f8fd0dba917a..63863dfc6994 100644 --- a/airflow/providers/amazon/aws/example_dags/example_salesforce_to_s3.py +++ b/airflow/providers/amazon/aws/example_dags/example_salesforce_to_s3.py @@ -22,8 +22,7 @@ from datetime import datetime from airflow import DAG -from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator -from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator +from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator, S3DeleteObjectsOperator from airflow.providers.amazon.aws.transfers.salesforce_to_s3 import SalesforceToS3Operator BASE_PATH = "salesforce/customers" diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py new file mode 100644 index 000000000000..a809dac8a1da --- /dev/null +++ b/airflow/providers/amazon/aws/operators/s3.py @@ -0,0 +1,687 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""This module contains AWS S3 operators.""" +import subprocess +import sys +from tempfile import NamedTemporaryFile +from typing import Dict, Iterable, List, Optional, Sequence, Union + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + +BUCKET_DOES_NOT_EXIST_MSG = "Bucket with name: %s doesn't exist" + + +class S3CreateBucketOperator(BaseOperator): + """ + This operator creates an S3 bucket + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:S3CreateBucketOperator` + + :param bucket_name: This is bucket name you want to create + :type bucket_name: str + :param aws_conn_id: The Airflow connection used for AWS credentials. + If this is None or empty then the default boto3 behaviour is used. If + running Airflow in a distributed manner and aws_conn_id is None or + empty, then default boto3 configuration would be used (and must be + maintained on each worker node). + :type aws_conn_id: Optional[str] + :param region_name: AWS region_name. If not specified fetched from connection. + :type region_name: Optional[str] + """ + + template_fields = ("bucket_name",) + + def __init__( + self, + *, + bucket_name: str, + aws_conn_id: Optional[str] = "aws_default", + region_name: Optional[str] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.bucket_name = bucket_name + self.region_name = region_name + self.aws_conn_id = aws_conn_id + self.region_name = region_name + + def execute(self, context): + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) + if not s3_hook.check_for_bucket(self.bucket_name): + s3_hook.create_bucket(bucket_name=self.bucket_name, region_name=self.region_name) + self.log.info("Created bucket with name: %s", self.bucket_name) + else: + self.log.info("Bucket with name: %s already exists", self.bucket_name) + + +class S3DeleteBucketOperator(BaseOperator): + """ + This operator deletes an S3 bucket + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:S3DeleteBucketOperator` + + :param bucket_name: This is bucket name you want to delete + :type bucket_name: str + :param force_delete: Forcibly delete all objects in the bucket before deleting the bucket + :type force_delete: bool + :param aws_conn_id: The Airflow connection used for AWS credentials. + If this is None or empty then the default boto3 behaviour is used. If + running Airflow in a distributed manner and aws_conn_id is None or + empty, then default boto3 configuration would be used (and must be + maintained on each worker node). + :type aws_conn_id: Optional[str] + """ + + template_fields = ("bucket_name",) + + def __init__( + self, + bucket_name: str, + force_delete: bool = False, + aws_conn_id: Optional[str] = "aws_default", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.bucket_name = bucket_name + self.force_delete = force_delete + self.aws_conn_id = aws_conn_id + + def execute(self, context): + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) + if s3_hook.check_for_bucket(self.bucket_name): + s3_hook.delete_bucket(bucket_name=self.bucket_name, force_delete=self.force_delete) + self.log.info("Deleted bucket with name: %s", self.bucket_name) + else: + self.log.info("Bucket with name: %s doesn't exist", self.bucket_name) + + +class S3GetBucketTaggingOperator(BaseOperator): + """ + This operator gets tagging from an S3 bucket + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:S3GetBucketTaggingOperator` + + :param bucket_name: This is bucket name you want to reference + :type bucket_name: str + :param aws_conn_id: The Airflow connection used for AWS credentials. + If this is None or empty then the default boto3 behaviour is used. If + running Airflow in a distributed manner and aws_conn_id is None or + empty, then default boto3 configuration would be used (and must be + maintained on each worker node). + :type aws_conn_id: Optional[str] + """ + + template_fields = ("bucket_name",) + + def __init__(self, bucket_name: str, aws_conn_id: Optional[str] = "aws_default", **kwargs) -> None: + super().__init__(**kwargs) + self.bucket_name = bucket_name + self.aws_conn_id = aws_conn_id + + def execute(self, context): + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) + + if s3_hook.check_for_bucket(self.bucket_name): + self.log.info("Getting tags for bucket %s", self.bucket_name) + return s3_hook.get_bucket_tagging(self.bucket_name) + else: + self.log.warning(BUCKET_DOES_NOT_EXIST_MSG, self.bucket_name) + return None + + +class S3PutBucketTaggingOperator(BaseOperator): + """ + This operator puts tagging for an S3 bucket. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:S3PutBucketTaggingOperator` + + :param bucket_name: The name of the bucket to add tags to. + :type bucket_name: str + :param key: The key portion of the key/value pair for a tag to be added. + If a key is provided, a value must be provided as well. + :type key: str + :param value: The value portion of the key/value pair for a tag to be added. + If a value is provided, a key must be provided as well. + :param tag_set: A List of key/value pairs. + :type tag_set: List[Dict[str, str]] + :param aws_conn_id: The Airflow connection used for AWS credentials. + If this is None or empty then the default boto3 behaviour is used. If + running Airflow in a distributed manner and aws_conn_id is None or + empty, then the default boto3 configuration would be used (and must be + maintained on each worker node). + :type aws_conn_id: Optional[str] + """ + + template_fields = ("bucket_name",) + template_fields_renderers = {"tag_set": "json"} + + def __init__( + self, + bucket_name: str, + key: Optional[str] = None, + value: Optional[str] = None, + tag_set: Optional[List[Dict[str, str]]] = None, + aws_conn_id: Optional[str] = "aws_default", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.key = key + self.value = value + self.tag_set = tag_set + self.bucket_name = bucket_name + self.aws_conn_id = aws_conn_id + + def execute(self, context): + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) + + if s3_hook.check_for_bucket(self.bucket_name): + self.log.info("Putting tags for bucket %s", self.bucket_name) + return s3_hook.put_bucket_tagging( + key=self.key, value=self.value, tag_set=self.tag_set, bucket_name=self.bucket_name + ) + else: + self.log.warning(BUCKET_DOES_NOT_EXIST_MSG, self.bucket_name) + return None + + +class S3DeleteBucketTaggingOperator(BaseOperator): + """ + This operator deletes tagging from an S3 bucket. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:S3DeleteBucketTaggingOperator` + + :param bucket_name: This is the name of the bucket to delete tags from. + :type bucket_name: str + :param aws_conn_id: The Airflow connection used for AWS credentials. + If this is None or empty then the default boto3 behaviour is used. If + running Airflow in a distributed manner and aws_conn_id is None or + empty, then default boto3 configuration would be used (and must be + maintained on each worker node). + :type aws_conn_id: Optional[str] + """ + + template_fields = ("bucket_name",) + + def __init__(self, bucket_name: str, aws_conn_id: Optional[str] = "aws_default", **kwargs) -> None: + super().__init__(**kwargs) + self.bucket_name = bucket_name + self.aws_conn_id = aws_conn_id + + def execute(self, context): + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) + + if s3_hook.check_for_bucket(self.bucket_name): + self.log.info("Deleting tags for bucket %s", self.bucket_name) + return s3_hook.delete_bucket_tagging(self.bucket_name) + else: + self.log.warning(BUCKET_DOES_NOT_EXIST_MSG, self.bucket_name) + return None + + +class S3CopyObjectOperator(BaseOperator): + """ + Creates a copy of an object that is already stored in S3. + + Note: the S3 connection used here needs to have access to both + source and destination bucket/key. + + :param source_bucket_key: The key of the source object. (templated) + + It can be either full s3:// style url or relative path from root level. + + When it's specified as a full s3:// url, please omit source_bucket_name. + :type source_bucket_key: str + :param dest_bucket_key: The key of the object to copy to. (templated) + + The convention to specify `dest_bucket_key` is the same as `source_bucket_key`. + :type dest_bucket_key: str + :param source_bucket_name: Name of the S3 bucket where the source object is in. (templated) + + It should be omitted when `source_bucket_key` is provided as a full s3:// url. + :type source_bucket_name: str + :param dest_bucket_name: Name of the S3 bucket to where the object is copied. (templated) + + It should be omitted when `dest_bucket_key` is provided as a full s3:// url. + :type dest_bucket_name: str + :param source_version_id: Version ID of the source object (OPTIONAL) + :type source_version_id: str + :param aws_conn_id: Connection id of the S3 connection to use + :type aws_conn_id: str + :param verify: Whether or not to verify SSL certificates for S3 connection. + By default SSL certificates are verified. + + You can provide the following values: + + - False: do not validate SSL certificates. SSL will still be used, + but SSL certificates will not be + verified. + - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. + :type verify: bool or str + :param acl_policy: String specifying the canned ACL policy for the file being + uploaded to the S3 bucket. + :type acl_policy: str + """ + + template_fields = ('source_bucket_key', 'dest_bucket_key', 'source_bucket_name', 'dest_bucket_name') + + def __init__( + self, + *, + source_bucket_key: str, + dest_bucket_key: str, + source_bucket_name: Optional[str] = None, + dest_bucket_name: Optional[str] = None, + source_version_id: Optional[str] = None, + aws_conn_id: str = 'aws_default', + verify: Optional[Union[str, bool]] = None, + acl_policy: Optional[str] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.source_bucket_key = source_bucket_key + self.dest_bucket_key = dest_bucket_key + self.source_bucket_name = source_bucket_name + self.dest_bucket_name = dest_bucket_name + self.source_version_id = source_version_id + self.aws_conn_id = aws_conn_id + self.verify = verify + self.acl_policy = acl_policy + + def execute(self, context): + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + s3_hook.copy_object( + self.source_bucket_key, + self.dest_bucket_key, + self.source_bucket_name, + self.dest_bucket_name, + self.source_version_id, + self.acl_policy, + ) + + +class S3DeleteObjectsOperator(BaseOperator): + """ + To enable users to delete single object or multiple objects from + a bucket using a single HTTP request. + + Users may specify up to 1000 keys to delete. + + :param bucket: Name of the bucket in which you are going to delete object(s). (templated) + :type bucket: str + :param keys: The key(s) to delete from S3 bucket. (templated) + + When ``keys`` is a string, it's supposed to be the key name of + the single object to delete. + + When ``keys`` is a list, it's supposed to be the list of the + keys to delete. + + You may specify up to 1000 keys. + :type keys: str or list + :param prefix: Prefix of objects to delete. (templated) + All objects matching this prefix in the bucket will be deleted. + :type prefix: str + :param aws_conn_id: Connection id of the S3 connection to use + :type aws_conn_id: str + :param verify: Whether or not to verify SSL certificates for S3 connection. + By default SSL certificates are verified. + + You can provide the following values: + + - ``False``: do not validate SSL certificates. SSL will still be used, + but SSL certificates will not be + verified. + - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. + :type verify: bool or str + """ + + template_fields = ('keys', 'bucket', 'prefix') + + def __init__( + self, + *, + bucket: str, + keys: Optional[Union[str, list]] = None, + prefix: Optional[str] = None, + aws_conn_id: str = 'aws_default', + verify: Optional[Union[str, bool]] = None, + **kwargs, + ): + + if not bool(keys) ^ bool(prefix): + raise ValueError("Either keys or prefix should be set.") + + super().__init__(**kwargs) + self.bucket = bucket + self.keys = keys + self.prefix = prefix + self.aws_conn_id = aws_conn_id + self.verify = verify + + def execute(self, context): + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + + keys = self.keys or s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix) + if keys: + s3_hook.delete_objects(bucket=self.bucket, keys=keys) + + +class S3FileTransformOperator(BaseOperator): + """ + Copies data from a source S3 location to a temporary location on the + local filesystem. Runs a transformation on this file as specified by + the transformation script and uploads the output to a destination S3 + location. + + The locations of the source and the destination files in the local + filesystem is provided as an first and second arguments to the + transformation script. The transformation script is expected to read the + data from source, transform it and write the output to the local + destination file. The operator then takes over control and uploads the + local destination file to S3. + + S3 Select is also available to filter the source contents. Users can + omit the transformation script if S3 Select expression is specified. + + :param source_s3_key: The key to be retrieved from S3. (templated) + :type source_s3_key: str + :param dest_s3_key: The key to be written from S3. (templated) + :type dest_s3_key: str + :param transform_script: location of the executable transformation script + :type transform_script: str + :param select_expression: S3 Select expression + :type select_expression: str + :param script_args: arguments for transformation script (templated) + :type script_args: sequence of str + :param source_aws_conn_id: source s3 connection + :type source_aws_conn_id: str + :param source_verify: Whether or not to verify SSL certificates for S3 connection. + By default SSL certificates are verified. + You can provide the following values: + + - ``False``: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. + - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. + + This is also applicable to ``dest_verify``. + :type source_verify: bool or str + :param dest_aws_conn_id: destination s3 connection + :type dest_aws_conn_id: str + :param dest_verify: Whether or not to verify SSL certificates for S3 connection. + See: ``source_verify`` + :type dest_verify: bool or str + :param replace: Replace dest S3 key if it already exists + :type replace: bool + """ + + template_fields = ('source_s3_key', 'dest_s3_key', 'script_args') + template_ext = () + ui_color = '#f9c915' + + def __init__( + self, + *, + source_s3_key: str, + dest_s3_key: str, + transform_script: Optional[str] = None, + select_expression=None, + script_args: Optional[Sequence[str]] = None, + source_aws_conn_id: str = 'aws_default', + source_verify: Optional[Union[bool, str]] = None, + dest_aws_conn_id: str = 'aws_default', + dest_verify: Optional[Union[bool, str]] = None, + replace: bool = False, + **kwargs, + ) -> None: + + super().__init__(**kwargs) + self.source_s3_key = source_s3_key + self.source_aws_conn_id = source_aws_conn_id + self.source_verify = source_verify + self.dest_s3_key = dest_s3_key + self.dest_aws_conn_id = dest_aws_conn_id + self.dest_verify = dest_verify + self.replace = replace + self.transform_script = transform_script + self.select_expression = select_expression + self.script_args = script_args or [] + self.output_encoding = sys.getdefaultencoding() + + def execute(self, context): + if self.transform_script is None and self.select_expression is None: + raise AirflowException("Either transform_script or select_expression must be specified") + + source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id, verify=self.source_verify) + dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify) + + self.log.info("Downloading source S3 file %s", self.source_s3_key) + if not source_s3.check_for_key(self.source_s3_key): + raise AirflowException(f"The source key {self.source_s3_key} does not exist") + source_s3_key_object = source_s3.get_key(self.source_s3_key) + + with NamedTemporaryFile("wb") as f_source, NamedTemporaryFile("wb") as f_dest: + self.log.info("Dumping S3 file %s contents to local file %s", self.source_s3_key, f_source.name) + + if self.select_expression is not None: + content = source_s3.select_key(key=self.source_s3_key, expression=self.select_expression) + f_source.write(content.encode("utf-8")) + else: + source_s3_key_object.download_fileobj(Fileobj=f_source) + f_source.flush() + + if self.transform_script is not None: + with subprocess.Popen( + [self.transform_script, f_source.name, f_dest.name, *self.script_args], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + close_fds=True, + ) as process: + self.log.info("Output:") + for line in iter(process.stdout.readline, b''): + self.log.info(line.decode(self.output_encoding).rstrip()) + + process.wait() + + if process.returncode: + raise AirflowException(f"Transform script failed: {process.returncode}") + else: + self.log.info( + "Transform script successful. Output temporarily located at %s", f_dest.name + ) + + self.log.info("Uploading transformed file to S3") + f_dest.flush() + dest_s3.load_file( + filename=f_dest.name if self.transform_script else f_source.name, + key=self.dest_s3_key, + replace=self.replace, + ) + self.log.info("Upload successful") + + +class S3ListOperator(BaseOperator): + """ + List all objects from the bucket with the given string prefix in name. + + This operator returns a python list with the name of objects which can be + used by `xcom` in the downstream task. + + :param bucket: The S3 bucket where to find the objects. (templated) + :type bucket: str + :param prefix: Prefix string to filters the objects whose name begin with + such prefix. (templated) + :type prefix: str + :param delimiter: the delimiter marks key hierarchy. (templated) + :type delimiter: str + :param aws_conn_id: The connection ID to use when connecting to S3 storage. + :type aws_conn_id: str + :param verify: Whether or not to verify SSL certificates for S3 connection. + By default SSL certificates are verified. + You can provide the following values: + + - ``False``: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. + - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. + :type verify: bool or str + + + **Example**: + The following operator would list all the files + (excluding subfolders) from the S3 + ``customers/2018/04/`` key in the ``data`` bucket. :: + + s3_file = S3ListOperator( + task_id='list_3s_files', + bucket='data', + prefix='customers/2018/04/', + delimiter='/', + aws_conn_id='aws_customers_conn' + ) + """ + + template_fields: Iterable[str] = ('bucket', 'prefix', 'delimiter') + ui_color = '#ffd700' + + def __init__( + self, + *, + bucket: str, + prefix: str = '', + delimiter: str = '', + aws_conn_id: str = 'aws_default', + verify: Optional[Union[str, bool]] = None, + **kwargs, + ): + super().__init__(**kwargs) + self.bucket = bucket + self.prefix = prefix + self.delimiter = delimiter + self.aws_conn_id = aws_conn_id + self.verify = verify + + def execute(self, context): + hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + + self.log.info( + 'Getting the list of files from bucket: %s in prefix: %s (Delimiter %s)', + self.bucket, + self.prefix, + self.delimiter, + ) + + return hook.list_keys(bucket_name=self.bucket, prefix=self.prefix, delimiter=self.delimiter) + + +class S3ListPrefixesOperator(BaseOperator): + """ + List all subfolders from the bucket with the given string prefix in name. + + This operator returns a python list with the name of all subfolders which + can be used by `xcom` in the downstream task. + + :param bucket: The S3 bucket where to find the subfolders. (templated) + :type bucket: str + :param prefix: Prefix string to filter the subfolders whose name begin with + such prefix. (templated) + :type prefix: str + :param delimiter: the delimiter marks subfolder hierarchy. (templated) + :type delimiter: str + :param aws_conn_id: The connection ID to use when connecting to S3 storage. + :type aws_conn_id: str + :param verify: Whether or not to verify SSL certificates for S3 connection. + By default SSL certificates are verified. + You can provide the following values: + + - ``False``: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. + - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. + :type verify: bool or str + + + **Example**: + The following operator would list all the subfolders + from the S3 ``customers/2018/04/`` prefix in the ``data`` bucket. :: + + s3_file = S3ListPrefixesOperator( + task_id='list_s3_prefixes', + bucket='data', + prefix='customers/2018/04/', + delimiter='/', + aws_conn_id='aws_customers_conn' + ) + """ + + template_fields: Iterable[str] = ('bucket', 'prefix', 'delimiter') + ui_color = '#ffd700' + + def __init__( + self, + *, + bucket: str, + prefix: str, + delimiter: str, + aws_conn_id: str = 'aws_default', + verify: Optional[Union[str, bool]] = None, + **kwargs, + ): + super().__init__(**kwargs) + self.bucket = bucket + self.prefix = prefix + self.delimiter = delimiter + self.aws_conn_id = aws_conn_id + self.verify = verify + + def execute(self, context): + hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + + self.log.info( + 'Getting the list of subfolders from bucket: %s in prefix: %s (Delimiter %s)', + self.bucket, + self.prefix, + self.delimiter, + ) + + return hook.list_prefixes(bucket_name=self.bucket, prefix=self.prefix, delimiter=self.delimiter) diff --git a/airflow/providers/amazon/aws/operators/s3_bucket.py b/airflow/providers/amazon/aws/operators/s3_bucket.py index 0cf2307d6b1e..e5806fa78084 100644 --- a/airflow/providers/amazon/aws/operators/s3_bucket.py +++ b/airflow/providers/amazon/aws/operators/s3_bucket.py @@ -15,96 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS S3 operators.""" -from typing import Optional +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.s3`.""" -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook +import warnings +from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator # noqa -class S3CreateBucketOperator(BaseOperator): - """ - This operator creates an S3 bucket - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:S3CreateBucketOperator` - - :param bucket_name: This is bucket name you want to create - :type bucket_name: str - :param aws_conn_id: The Airflow connection used for AWS credentials. - If this is None or empty then the default boto3 behaviour is used. If - running Airflow in a distributed manner and aws_conn_id is None or - empty, then default boto3 configuration would be used (and must be - maintained on each worker node). - :type aws_conn_id: Optional[str] - :param region_name: AWS region_name. If not specified fetched from connection. - :type region_name: Optional[str] - """ - - template_fields = ("bucket_name",) - - def __init__( - self, - *, - bucket_name: str, - aws_conn_id: Optional[str] = "aws_default", - region_name: Optional[str] = None, - **kwargs, - ) -> None: - super().__init__(**kwargs) - self.bucket_name = bucket_name - self.region_name = region_name - self.aws_conn_id = aws_conn_id - self.region_name = region_name - - def execute(self, context): - s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) - if not s3_hook.check_for_bucket(self.bucket_name): - s3_hook.create_bucket(bucket_name=self.bucket_name, region_name=self.region_name) - self.log.info("Created bucket with name: %s", self.bucket_name) - else: - self.log.info("Bucket with name: %s already exists", self.bucket_name) - - -class S3DeleteBucketOperator(BaseOperator): - """ - This operator deletes an S3 bucket - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:S3DeleteBucketOperator` - - :param bucket_name: This is bucket name you want to delete - :type bucket_name: str - :param force_delete: Forcibly delete all objects in the bucket before deleting the bucket - :type force_delete: bool - :param aws_conn_id: The Airflow connection used for AWS credentials. - If this is None or empty then the default boto3 behaviour is used. If - running Airflow in a distributed manner and aws_conn_id is None or - empty, then default boto3 configuration would be used (and must be - maintained on each worker node). - :type aws_conn_id: Optional[str] - """ - - template_fields = ("bucket_name",) - - def __init__( - self, - bucket_name: str, - force_delete: bool = False, - aws_conn_id: Optional[str] = "aws_default", - **kwargs, - ) -> None: - super().__init__(**kwargs) - self.bucket_name = bucket_name - self.force_delete = force_delete - self.aws_conn_id = aws_conn_id - - def execute(self, context): - s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) - if s3_hook.check_for_bucket(self.bucket_name): - s3_hook.delete_bucket(bucket_name=self.bucket_name, force_delete=self.force_delete) - self.log.info("Deleted bucket with name: %s", self.bucket_name) - else: - self.log.info("Bucket with name: %s doesn't exist", self.bucket_name) +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.s3`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/operators/s3_bucket_tagging.py b/airflow/providers/amazon/aws/operators/s3_bucket_tagging.py index 6f73cc0406cf..fb214412643a 100644 --- a/airflow/providers/amazon/aws/operators/s3_bucket_tagging.py +++ b/airflow/providers/amazon/aws/operators/s3_bucket_tagging.py @@ -15,139 +15,18 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This module contains AWS S3 operators.""" -from typing import Dict, List, Optional +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.s3`.""" -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook +import warnings -BUCKET_DOES_NOT_EXIST_MSG = "Bucket with name: %s doesn't exist" +from airflow.providers.amazon.aws.operators.s3 import ( # noqa + S3DeleteBucketTaggingOperator, + S3GetBucketTaggingOperator, + S3PutBucketTaggingOperator, +) - -class S3GetBucketTaggingOperator(BaseOperator): - """ - This operator gets tagging from an S3 bucket - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:S3GetBucketTaggingOperator` - - :param bucket_name: This is bucket name you want to reference - :type bucket_name: str - :param aws_conn_id: The Airflow connection used for AWS credentials. - If this is None or empty then the default boto3 behaviour is used. If - running Airflow in a distributed manner and aws_conn_id is None or - empty, then default boto3 configuration would be used (and must be - maintained on each worker node). - :type aws_conn_id: Optional[str] - """ - - template_fields = ("bucket_name",) - - def __init__(self, bucket_name: str, aws_conn_id: Optional[str] = "aws_default", **kwargs) -> None: - super().__init__(**kwargs) - self.bucket_name = bucket_name - self.aws_conn_id = aws_conn_id - - def execute(self, context): - s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) - - if s3_hook.check_for_bucket(self.bucket_name): - self.log.info("Getting tags for bucket %s", self.bucket_name) - return s3_hook.get_bucket_tagging(self.bucket_name) - else: - self.log.warning(BUCKET_DOES_NOT_EXIST_MSG, self.bucket_name) - return None - - -class S3PutBucketTaggingOperator(BaseOperator): - """ - This operator puts tagging for an S3 bucket. - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:S3PutBucketTaggingOperator` - - :param bucket_name: The name of the bucket to add tags to. - :type bucket_name: str - :param key: The key portion of the key/value pair for a tag to be added. - If a key is provided, a value must be provided as well. - :type key: str - :param value: The value portion of the key/value pair for a tag to be added. - If a value is provided, a key must be provided as well. - :param tag_set: A List of key/value pairs. - :type tag_set: List[Dict[str, str]] - :param aws_conn_id: The Airflow connection used for AWS credentials. - If this is None or empty then the default boto3 behaviour is used. If - running Airflow in a distributed manner and aws_conn_id is None or - empty, then the default boto3 configuration would be used (and must be - maintained on each worker node). - :type aws_conn_id: Optional[str] - """ - - template_fields = ("bucket_name",) - template_fields_renderers = {"tag_set": "json"} - - def __init__( - self, - bucket_name: str, - key: Optional[str] = None, - value: Optional[str] = None, - tag_set: Optional[List[Dict[str, str]]] = None, - aws_conn_id: Optional[str] = "aws_default", - **kwargs, - ) -> None: - super().__init__(**kwargs) - self.key = key - self.value = value - self.tag_set = tag_set - self.bucket_name = bucket_name - self.aws_conn_id = aws_conn_id - - def execute(self, context): - s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) - - if s3_hook.check_for_bucket(self.bucket_name): - self.log.info("Putting tags for bucket %s", self.bucket_name) - return s3_hook.put_bucket_tagging( - key=self.key, value=self.value, tag_set=self.tag_set, bucket_name=self.bucket_name - ) - else: - self.log.warning(BUCKET_DOES_NOT_EXIST_MSG, self.bucket_name) - return None - - -class S3DeleteBucketTaggingOperator(BaseOperator): - """ - This operator deletes tagging from an S3 bucket. - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:S3DeleteBucketTaggingOperator` - - :param bucket_name: This is the name of the bucket to delete tags from. - :type bucket_name: str - :param aws_conn_id: The Airflow connection used for AWS credentials. - If this is None or empty then the default boto3 behaviour is used. If - running Airflow in a distributed manner and aws_conn_id is None or - empty, then default boto3 configuration would be used (and must be - maintained on each worker node). - :type aws_conn_id: Optional[str] - """ - - template_fields = ("bucket_name",) - - def __init__(self, bucket_name: str, aws_conn_id: Optional[str] = "aws_default", **kwargs) -> None: - super().__init__(**kwargs) - self.bucket_name = bucket_name - self.aws_conn_id = aws_conn_id - - def execute(self, context): - s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) - - if s3_hook.check_for_bucket(self.bucket_name): - self.log.info("Deleting tags for bucket %s", self.bucket_name) - return s3_hook.delete_bucket_tagging(self.bucket_name) - else: - self.log.warning(BUCKET_DOES_NOT_EXIST_MSG, self.bucket_name) - return None +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.s3`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/operators/s3_copy_object.py b/airflow/providers/amazon/aws/operators/s3_copy_object.py index ccc9102e5750..298f8d21724c 100644 --- a/airflow/providers/amazon/aws/operators/s3_copy_object.py +++ b/airflow/providers/amazon/aws/operators/s3_copy_object.py @@ -15,91 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import Optional, Union +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.s3`.""" -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook +import warnings +from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator # noqa -class S3CopyObjectOperator(BaseOperator): - """ - Creates a copy of an object that is already stored in S3. - - Note: the S3 connection used here needs to have access to both - source and destination bucket/key. - - :param source_bucket_key: The key of the source object. (templated) - - It can be either full s3:// style url or relative path from root level. - - When it's specified as a full s3:// url, please omit source_bucket_name. - :type source_bucket_key: str - :param dest_bucket_key: The key of the object to copy to. (templated) - - The convention to specify `dest_bucket_key` is the same as `source_bucket_key`. - :type dest_bucket_key: str - :param source_bucket_name: Name of the S3 bucket where the source object is in. (templated) - - It should be omitted when `source_bucket_key` is provided as a full s3:// url. - :type source_bucket_name: str - :param dest_bucket_name: Name of the S3 bucket to where the object is copied. (templated) - - It should be omitted when `dest_bucket_key` is provided as a full s3:// url. - :type dest_bucket_name: str - :param source_version_id: Version ID of the source object (OPTIONAL) - :type source_version_id: str - :param aws_conn_id: Connection id of the S3 connection to use - :type aws_conn_id: str - :param verify: Whether or not to verify SSL certificates for S3 connection. - By default SSL certificates are verified. - - You can provide the following values: - - - False: do not validate SSL certificates. SSL will still be used, - but SSL certificates will not be - verified. - - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses. - You can specify this argument if you want to use a different - CA cert bundle than the one used by botocore. - :type verify: bool or str - :param acl_policy: String specifying the canned ACL policy for the file being - uploaded to the S3 bucket. - :type acl_policy: str - """ - - template_fields = ('source_bucket_key', 'dest_bucket_key', 'source_bucket_name', 'dest_bucket_name') - - def __init__( - self, - *, - source_bucket_key: str, - dest_bucket_key: str, - source_bucket_name: Optional[str] = None, - dest_bucket_name: Optional[str] = None, - source_version_id: Optional[str] = None, - aws_conn_id: str = 'aws_default', - verify: Optional[Union[str, bool]] = None, - acl_policy: Optional[str] = None, - **kwargs, - ): - super().__init__(**kwargs) - - self.source_bucket_key = source_bucket_key - self.dest_bucket_key = dest_bucket_key - self.source_bucket_name = source_bucket_name - self.dest_bucket_name = dest_bucket_name - self.source_version_id = source_version_id - self.aws_conn_id = aws_conn_id - self.verify = verify - self.acl_policy = acl_policy - - def execute(self, context): - s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) - s3_hook.copy_object( - self.source_bucket_key, - self.dest_bucket_key, - self.source_bucket_name, - self.dest_bucket_name, - self.source_version_id, - self.acl_policy, - ) +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.s3`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/operators/s3_delete_objects.py b/airflow/providers/amazon/aws/operators/s3_delete_objects.py index de8d8ce620a0..35d86893eb37 100644 --- a/airflow/providers/amazon/aws/operators/s3_delete_objects.py +++ b/airflow/providers/amazon/aws/operators/s3_delete_objects.py @@ -15,76 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import Optional, Union +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.s3`.""" -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook +import warnings +from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator # noqa -class S3DeleteObjectsOperator(BaseOperator): - """ - To enable users to delete single object or multiple objects from - a bucket using a single HTTP request. - - Users may specify up to 1000 keys to delete. - - :param bucket: Name of the bucket in which you are going to delete object(s). (templated) - :type bucket: str - :param keys: The key(s) to delete from S3 bucket. (templated) - - When ``keys`` is a string, it's supposed to be the key name of - the single object to delete. - - When ``keys`` is a list, it's supposed to be the list of the - keys to delete. - - You may specify up to 1000 keys. - :type keys: str or list - :param prefix: Prefix of objects to delete. (templated) - All objects matching this prefix in the bucket will be deleted. - :type prefix: str - :param aws_conn_id: Connection id of the S3 connection to use - :type aws_conn_id: str - :param verify: Whether or not to verify SSL certificates for S3 connection. - By default SSL certificates are verified. - - You can provide the following values: - - - ``False``: do not validate SSL certificates. SSL will still be used, - but SSL certificates will not be - verified. - - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. - You can specify this argument if you want to use a different - CA cert bundle than the one used by botocore. - :type verify: bool or str - """ - - template_fields = ('keys', 'bucket', 'prefix') - - def __init__( - self, - *, - bucket: str, - keys: Optional[Union[str, list]] = None, - prefix: Optional[str] = None, - aws_conn_id: str = 'aws_default', - verify: Optional[Union[str, bool]] = None, - **kwargs, - ): - - if not bool(keys) ^ bool(prefix): - raise ValueError("Either keys or prefix should be set.") - - super().__init__(**kwargs) - self.bucket = bucket - self.keys = keys - self.prefix = prefix - self.aws_conn_id = aws_conn_id - self.verify = verify - - def execute(self, context): - s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) - - keys = self.keys or s3_hook.list_keys(bucket_name=self.bucket, prefix=self.prefix) - if keys: - s3_hook.delete_objects(bucket=self.bucket, keys=keys) +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.s3`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/operators/s3_file_transform.py b/airflow/providers/amazon/aws/operators/s3_file_transform.py index b6911a737d61..4400b202bbda 100644 --- a/airflow/providers/amazon/aws/operators/s3_file_transform.py +++ b/airflow/providers/amazon/aws/operators/s3_file_transform.py @@ -15,148 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.s3`.""" -import subprocess -import sys -from tempfile import NamedTemporaryFile -from typing import Optional, Sequence, Union +import warnings -from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.amazon.aws.operators.s3 import S3FileTransformOperator # noqa - -class S3FileTransformOperator(BaseOperator): - """ - Copies data from a source S3 location to a temporary location on the - local filesystem. Runs a transformation on this file as specified by - the transformation script and uploads the output to a destination S3 - location. - - The locations of the source and the destination files in the local - filesystem is provided as an first and second arguments to the - transformation script. The transformation script is expected to read the - data from source, transform it and write the output to the local - destination file. The operator then takes over control and uploads the - local destination file to S3. - - S3 Select is also available to filter the source contents. Users can - omit the transformation script if S3 Select expression is specified. - - :param source_s3_key: The key to be retrieved from S3. (templated) - :type source_s3_key: str - :param dest_s3_key: The key to be written from S3. (templated) - :type dest_s3_key: str - :param transform_script: location of the executable transformation script - :type transform_script: str - :param select_expression: S3 Select expression - :type select_expression: str - :param script_args: arguments for transformation script (templated) - :type script_args: sequence of str - :param source_aws_conn_id: source s3 connection - :type source_aws_conn_id: str - :param source_verify: Whether or not to verify SSL certificates for S3 connection. - By default SSL certificates are verified. - You can provide the following values: - - - ``False``: do not validate SSL certificates. SSL will still be used - (unless use_ssl is False), but SSL certificates will not be - verified. - - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. - You can specify this argument if you want to use a different - CA cert bundle than the one used by botocore. - - This is also applicable to ``dest_verify``. - :type source_verify: bool or str - :param dest_aws_conn_id: destination s3 connection - :type dest_aws_conn_id: str - :param dest_verify: Whether or not to verify SSL certificates for S3 connection. - See: ``source_verify`` - :type dest_verify: bool or str - :param replace: Replace dest S3 key if it already exists - :type replace: bool - """ - - template_fields = ('source_s3_key', 'dest_s3_key', 'script_args') - template_ext = () - ui_color = '#f9c915' - - def __init__( - self, - *, - source_s3_key: str, - dest_s3_key: str, - transform_script: Optional[str] = None, - select_expression=None, - script_args: Optional[Sequence[str]] = None, - source_aws_conn_id: str = 'aws_default', - source_verify: Optional[Union[bool, str]] = None, - dest_aws_conn_id: str = 'aws_default', - dest_verify: Optional[Union[bool, str]] = None, - replace: bool = False, - **kwargs, - ) -> None: - - super().__init__(**kwargs) - self.source_s3_key = source_s3_key - self.source_aws_conn_id = source_aws_conn_id - self.source_verify = source_verify - self.dest_s3_key = dest_s3_key - self.dest_aws_conn_id = dest_aws_conn_id - self.dest_verify = dest_verify - self.replace = replace - self.transform_script = transform_script - self.select_expression = select_expression - self.script_args = script_args or [] - self.output_encoding = sys.getdefaultencoding() - - def execute(self, context): - if self.transform_script is None and self.select_expression is None: - raise AirflowException("Either transform_script or select_expression must be specified") - - source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id, verify=self.source_verify) - dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify) - - self.log.info("Downloading source S3 file %s", self.source_s3_key) - if not source_s3.check_for_key(self.source_s3_key): - raise AirflowException(f"The source key {self.source_s3_key} does not exist") - source_s3_key_object = source_s3.get_key(self.source_s3_key) - - with NamedTemporaryFile("wb") as f_source, NamedTemporaryFile("wb") as f_dest: - self.log.info("Dumping S3 file %s contents to local file %s", self.source_s3_key, f_source.name) - - if self.select_expression is not None: - content = source_s3.select_key(key=self.source_s3_key, expression=self.select_expression) - f_source.write(content.encode("utf-8")) - else: - source_s3_key_object.download_fileobj(Fileobj=f_source) - f_source.flush() - - if self.transform_script is not None: - with subprocess.Popen( - [self.transform_script, f_source.name, f_dest.name, *self.script_args], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - close_fds=True, - ) as process: - self.log.info("Output:") - for line in iter(process.stdout.readline, b''): - self.log.info(line.decode(self.output_encoding).rstrip()) - - process.wait() - - if process.returncode: - raise AirflowException(f"Transform script failed: {process.returncode}") - else: - self.log.info( - "Transform script successful. Output temporarily located at %s", f_dest.name - ) - - self.log.info("Uploading transformed file to S3") - f_dest.flush() - dest_s3.load_file( - filename=f_dest.name if self.transform_script else f_source.name, - key=self.dest_s3_key, - replace=self.replace, - ) - self.log.info("Upload successful") +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.s3`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/operators/s3_list.py b/airflow/providers/amazon/aws/operators/s3_list.py index 4e4550cf0ba2..c114a0b81488 100644 --- a/airflow/providers/amazon/aws/operators/s3_list.py +++ b/airflow/providers/amazon/aws/operators/s3_list.py @@ -15,84 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.s3`.""" -from typing import Iterable, Optional, Union +import warnings -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.amazon.aws.operators.s3 import S3ListOperator # noqa - -class S3ListOperator(BaseOperator): - """ - List all objects from the bucket with the given string prefix in name. - - This operator returns a python list with the name of objects which can be - used by `xcom` in the downstream task. - - :param bucket: The S3 bucket where to find the objects. (templated) - :type bucket: str - :param prefix: Prefix string to filters the objects whose name begin with - such prefix. (templated) - :type prefix: str - :param delimiter: the delimiter marks key hierarchy. (templated) - :type delimiter: str - :param aws_conn_id: The connection ID to use when connecting to S3 storage. - :type aws_conn_id: str - :param verify: Whether or not to verify SSL certificates for S3 connection. - By default SSL certificates are verified. - You can provide the following values: - - - ``False``: do not validate SSL certificates. SSL will still be used - (unless use_ssl is False), but SSL certificates will not be - verified. - - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. - You can specify this argument if you want to use a different - CA cert bundle than the one used by botocore. - :type verify: bool or str - - - **Example**: - The following operator would list all the files - (excluding subfolders) from the S3 - ``customers/2018/04/`` key in the ``data`` bucket. :: - - s3_file = S3ListOperator( - task_id='list_3s_files', - bucket='data', - prefix='customers/2018/04/', - delimiter='/', - aws_conn_id='aws_customers_conn' - ) - """ - - template_fields: Iterable[str] = ('bucket', 'prefix', 'delimiter') - ui_color = '#ffd700' - - def __init__( - self, - *, - bucket: str, - prefix: str = '', - delimiter: str = '', - aws_conn_id: str = 'aws_default', - verify: Optional[Union[str, bool]] = None, - **kwargs, - ): - super().__init__(**kwargs) - self.bucket = bucket - self.prefix = prefix - self.delimiter = delimiter - self.aws_conn_id = aws_conn_id - self.verify = verify - - def execute(self, context): - hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) - - self.log.info( - 'Getting the list of files from bucket: %s in prefix: %s (Delimiter %s)', - self.bucket, - self.prefix, - self.delimiter, - ) - - return hook.list_keys(bucket_name=self.bucket, prefix=self.prefix, delimiter=self.delimiter) +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.s3`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/operators/s3_list_prefixes.py b/airflow/providers/amazon/aws/operators/s3_list_prefixes.py index f16ac804318f..5e94f0c17f47 100644 --- a/airflow/providers/amazon/aws/operators/s3_list_prefixes.py +++ b/airflow/providers/amazon/aws/operators/s3_list_prefixes.py @@ -15,83 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.operators.s3`.""" -from typing import Iterable, Optional, Union +import warnings -from airflow.models import BaseOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.amazon.aws.operators.s3 import S3ListPrefixesOperator # noqa - -class S3ListPrefixesOperator(BaseOperator): - """ - List all subfolders from the bucket with the given string prefix in name. - - This operator returns a python list with the name of all subfolders which - can be used by `xcom` in the downstream task. - - :param bucket: The S3 bucket where to find the subfolders. (templated) - :type bucket: str - :param prefix: Prefix string to filter the subfolders whose name begin with - such prefix. (templated) - :type prefix: str - :param delimiter: the delimiter marks subfolder hierarchy. (templated) - :type delimiter: str - :param aws_conn_id: The connection ID to use when connecting to S3 storage. - :type aws_conn_id: str - :param verify: Whether or not to verify SSL certificates for S3 connection. - By default SSL certificates are verified. - You can provide the following values: - - - ``False``: do not validate SSL certificates. SSL will still be used - (unless use_ssl is False), but SSL certificates will not be - verified. - - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. - You can specify this argument if you want to use a different - CA cert bundle than the one used by botocore. - :type verify: bool or str - - - **Example**: - The following operator would list all the subfolders - from the S3 ``customers/2018/04/`` prefix in the ``data`` bucket. :: - - s3_file = S3ListPrefixesOperator( - task_id='list_s3_prefixes', - bucket='data', - prefix='customers/2018/04/', - delimiter='/', - aws_conn_id='aws_customers_conn' - ) - """ - - template_fields: Iterable[str] = ('bucket', 'prefix', 'delimiter') - ui_color = '#ffd700' - - def __init__( - self, - *, - bucket: str, - prefix: str, - delimiter: str, - aws_conn_id: str = 'aws_default', - verify: Optional[Union[str, bool]] = None, - **kwargs, - ): - super().__init__(**kwargs) - self.bucket = bucket - self.prefix = prefix - self.delimiter = delimiter - self.aws_conn_id = aws_conn_id - self.verify = verify - - def execute(self, context): - hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) - - self.log.info( - 'Getting the list of subfolders from bucket: %s in prefix: %s (Delimiter %s)', - self.bucket, - self.prefix, - self.delimiter, - ) - - return hook.list_prefixes(bucket_name=self.bucket, prefix=self.prefix, delimiter=self.delimiter) +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.s3`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/sensors/s3.py b/airflow/providers/amazon/aws/sensors/s3.py new file mode 100644 index 000000000000..6113d4b60502 --- /dev/null +++ b/airflow/providers/amazon/aws/sensors/s3.py @@ -0,0 +1,426 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import re +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Set, Union +from urllib.parse import urlparse + +try: + from functools import cached_property +except ImportError: + from cached_property import cached_property + +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.sensors.base import BaseSensorOperator, poke_mode_only + + +class S3KeySensor(BaseSensorOperator): + """ + Waits for a key (a file-like instance on S3) to be present in a S3 bucket. + S3 being a key/value it does not support folders. The path is just a key + a resource. + + :param bucket_key: The key being waited on. Supports full s3:// style url + or relative path from root level. When it's specified as a full s3:// + url, please leave bucket_name as `None`. + :type bucket_key: str + :param bucket_name: Name of the S3 bucket. Only needed when ``bucket_key`` + is not provided as a full s3:// url. + :type bucket_name: str + :param wildcard_match: whether the bucket_key should be interpreted as a + Unix wildcard pattern + :type wildcard_match: bool + :param aws_conn_id: a reference to the s3 connection + :type aws_conn_id: str + :param verify: Whether or not to verify SSL certificates for S3 connection. + By default SSL certificates are verified. + You can provide the following values: + + - ``False``: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. + - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. + :type verify: bool or str + """ + + template_fields = ('bucket_key', 'bucket_name') + + def __init__( + self, + *, + bucket_key: str, + bucket_name: Optional[str] = None, + wildcard_match: bool = False, + aws_conn_id: str = 'aws_default', + verify: Optional[Union[str, bool]] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.bucket_name = bucket_name + self.bucket_key = bucket_key + self.wildcard_match = wildcard_match + self.aws_conn_id = aws_conn_id + self.verify = verify + self.hook: Optional[S3Hook] = None + + def poke(self, context): + + if self.bucket_name is None: + parsed_url = urlparse(self.bucket_key) + if parsed_url.netloc == '': + raise AirflowException('If key is a relative path from root, please provide a bucket_name') + self.bucket_name = parsed_url.netloc + self.bucket_key = parsed_url.path.lstrip('/') + else: + parsed_url = urlparse(self.bucket_key) + if parsed_url.scheme != '' or parsed_url.netloc != '': + raise AirflowException( + 'If bucket_name is provided, bucket_key' + ' should be relative path from root' + ' level, rather than a full s3:// url' + ) + + self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key) + if self.wildcard_match: + return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name) + return self.get_hook().check_for_key(self.bucket_key, self.bucket_name) + + def get_hook(self) -> S3Hook: + """Create and return an S3Hook""" + if self.hook: + return self.hook + + self.hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + return self.hook + + +class S3KeySizeSensor(S3KeySensor): + """ + Waits for a key (a file-like instance on S3) to be present and be more than + some size in a S3 bucket. + S3 being a key/value it does not support folders. The path is just a key + a resource. + + :param bucket_key: The key being waited on. Supports full s3:// style url + or relative path from root level. When it's specified as a full s3:// + url, please leave bucket_name as `None`. + :type bucket_key: str + :param bucket_name: Name of the S3 bucket. Only needed when ``bucket_key`` + is not provided as a full s3:// url. + :type bucket_name: str + :param wildcard_match: whether the bucket_key should be interpreted as a + Unix wildcard pattern + :type wildcard_match: bool + :param aws_conn_id: a reference to the s3 connection + :type aws_conn_id: str + :param verify: Whether or not to verify SSL certificates for S3 connection. + By default SSL certificates are verified. + You can provide the following values: + + - ``False``: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. + - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. + :type verify: bool or str + :type check_fn: Optional[Callable[..., bool]] + :param check_fn: Function that receives the list of the S3 objects, + and returns the boolean: + - ``True``: a certain criteria is met + - ``False``: the criteria isn't met + **Example**: Wait for any S3 object size more than 1 megabyte :: + + def check_fn(self, data: List) -> bool: + return any(f.get('Size', 0) > 1048576 for f in data if isinstance(f, dict)) + :type check_fn: Optional[Callable[..., bool]] + """ + + def __init__( + self, + *, + check_fn: Optional[Callable[..., bool]] = None, + **kwargs, + ): + super().__init__(**kwargs) + self.check_fn_user = check_fn + + def poke(self, context): + if super().poke(context=context) is False: + return False + + s3_objects = self.get_files(s3_hook=self.get_hook()) + if not s3_objects: + return False + check_fn = self.check_fn if self.check_fn_user is None else self.check_fn_user + return check_fn(s3_objects) + + def get_files(self, s3_hook: S3Hook, delimiter: Optional[str] = '/') -> List: + """Gets a list of files in the bucket""" + prefix = self.bucket_key + config = { + 'PageSize': None, + 'MaxItems': None, + } + if self.wildcard_match: + prefix = re.split(r'[\[\*\?]', self.bucket_key, 1)[0] + + paginator = s3_hook.get_conn().get_paginator('list_objects_v2') + response = paginator.paginate( + Bucket=self.bucket_name, Prefix=prefix, Delimiter=delimiter, PaginationConfig=config + ) + keys = [] + for page in response: + if 'Contents' in page: + _temp = [k for k in page['Contents'] if isinstance(k.get('Size', None), (int, float))] + keys = keys + _temp + return keys + + def check_fn(self, data: List, object_min_size: Optional[Union[int, float]] = 0) -> bool: + """Default function for checking that S3 Objects have size more than 0 + + :param data: List of the objects in S3 bucket. + :type data: list + :param object_min_size: Checks if the objects sizes are greater then this value. + :type object_min_size: int + """ + return all(f.get('Size', 0) > object_min_size for f in data if isinstance(f, dict)) + + +@poke_mode_only +class S3KeysUnchangedSensor(BaseSensorOperator): + """ + Checks for changes in the number of objects at prefix in AWS S3 + bucket and returns True if the inactivity period has passed with no + increase in the number of objects. Note, this sensor will not behave correctly + in reschedule mode, as the state of the listed objects in the S3 bucket will + be lost between rescheduled invocations. + + :param bucket_name: Name of the S3 bucket + :type bucket_name: str + :param prefix: The prefix being waited on. Relative path from bucket root level. + :type prefix: str + :param aws_conn_id: a reference to the s3 connection + :type aws_conn_id: str + :param verify: Whether or not to verify SSL certificates for S3 connection. + By default SSL certificates are verified. + You can provide the following values: + + - ``False``: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. + - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. + :type verify: Optional[Union[bool, str]] + :param inactivity_period: The total seconds of inactivity to designate + keys unchanged. Note, this mechanism is not real time and + this operator may not return until a poke_interval after this period + has passed with no additional objects sensed. + :type inactivity_period: float + :param min_objects: The minimum number of objects needed for keys unchanged + sensor to be considered valid. + :type min_objects: int + :param previous_objects: The set of object ids found during the last poke. + :type previous_objects: Optional[Set[str]] + :param allow_delete: Should this sensor consider objects being deleted + between pokes valid behavior. If true a warning message will be logged + when this happens. If false an error will be raised. + :type allow_delete: bool + """ + + template_fields = ('bucket_name', 'prefix') + + def __init__( + self, + *, + bucket_name: str, + prefix: str, + aws_conn_id: str = 'aws_default', + verify: Optional[Union[bool, str]] = None, + inactivity_period: float = 60 * 60, + min_objects: int = 1, + previous_objects: Optional[Set[str]] = None, + allow_delete: bool = True, + **kwargs, + ) -> None: + + super().__init__(**kwargs) + + self.bucket_name = bucket_name + self.prefix = prefix + if inactivity_period < 0: + raise ValueError("inactivity_period must be non-negative") + self.inactivity_period = inactivity_period + self.min_objects = min_objects + self.previous_objects = previous_objects or set() + self.inactivity_seconds = 0 + self.allow_delete = allow_delete + self.aws_conn_id = aws_conn_id + self.verify = verify + self.last_activity_time: Optional[datetime] = None + + @cached_property + def hook(self): + """Returns S3Hook.""" + return S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + + def is_keys_unchanged(self, current_objects: Set[str]) -> bool: + """ + Checks whether new objects have been uploaded and the inactivity_period + has passed and updates the state of the sensor accordingly. + + :param current_objects: set of object ids in bucket during last poke. + :type current_objects: set[str] + """ + current_num_objects = len(current_objects) + if current_objects > self.previous_objects: + # When new objects arrived, reset the inactivity_seconds + # and update previous_objects for the next poke. + self.log.info( + "New objects found at %s, resetting last_activity_time.", + os.path.join(self.bucket_name, self.prefix), + ) + self.log.debug("New objects: %s", current_objects - self.previous_objects) + self.last_activity_time = datetime.now() + self.inactivity_seconds = 0 + self.previous_objects = current_objects + return False + + if self.previous_objects - current_objects: + # During the last poke interval objects were deleted. + if self.allow_delete: + deleted_objects = self.previous_objects - current_objects + self.previous_objects = current_objects + self.last_activity_time = datetime.now() + self.log.info( + "Objects were deleted during the last poke interval. Updating the " + "file counter and resetting last_activity_time:\n%s", + deleted_objects, + ) + return False + + raise AirflowException( + f"Illegal behavior: objects were deleted in" + f" {os.path.join(self.bucket_name, self.prefix)} between pokes." + ) + + if self.last_activity_time: + self.inactivity_seconds = int((datetime.now() - self.last_activity_time).total_seconds()) + else: + # Handles the first poke where last inactivity time is None. + self.last_activity_time = datetime.now() + self.inactivity_seconds = 0 + + if self.inactivity_seconds >= self.inactivity_period: + path = os.path.join(self.bucket_name, self.prefix) + + if current_num_objects >= self.min_objects: + self.log.info( + "SUCCESS: \nSensor found %s objects at %s.\n" + "Waited at least %s seconds, with no new objects uploaded.", + current_num_objects, + path, + self.inactivity_period, + ) + return True + + self.log.error("FAILURE: Inactivity Period passed, not enough objects found in %s", path) + + return False + return False + + def poke(self, context): + return self.is_keys_unchanged(set(self.hook.list_keys(self.bucket_name, prefix=self.prefix))) + + +class S3PrefixSensor(BaseSensorOperator): + """ + Waits for a prefix or all prefixes to exist. A prefix is the first part of a key, + thus enabling checking of constructs similar to glob ``airfl*`` or + SQL LIKE ``'airfl%'``. There is the possibility to precise a delimiter to + indicate the hierarchy or keys, meaning that the match will stop at that + delimiter. Current code accepts sane delimiters, i.e. characters that + are NOT special characters in the Python regex engine. + + :param bucket_name: Name of the S3 bucket + :type bucket_name: str + :param prefix: The prefix being waited on. Relative path from bucket root level. + :type prefix: str or list of str + :param delimiter: The delimiter intended to show hierarchy. + Defaults to '/'. + :type delimiter: str + :param aws_conn_id: a reference to the s3 connection + :type aws_conn_id: str + :param verify: Whether or not to verify SSL certificates for S3 connection. + By default SSL certificates are verified. + You can provide the following values: + + - ``False``: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. + - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. + :type verify: bool or str + """ + + template_fields = ('prefix', 'bucket_name') + + def __init__( + self, + *, + bucket_name: str, + prefix: Union[str, List[str]], + delimiter: str = '/', + aws_conn_id: str = 'aws_default', + verify: Optional[Union[str, bool]] = None, + **kwargs, + ): + super().__init__(**kwargs) + # Parse + self.bucket_name = bucket_name + self.prefix = [prefix] if isinstance(prefix, str) else prefix + self.delimiter = delimiter + self.aws_conn_id = aws_conn_id + self.verify = verify + self.hook: Optional[S3Hook] = None + + def poke(self, context: Dict[str, Any]): + self.log.info('Poking for prefix : %s in bucket s3://%s', self.prefix, self.bucket_name) + return all(self._check_for_prefix(prefix) for prefix in self.prefix) + + def get_hook(self) -> S3Hook: + """Create and return an S3Hook""" + if self.hook: + return self.hook + + self.hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + return self.hook + + def _check_for_prefix(self, prefix: str) -> bool: + return self.get_hook().check_for_prefix( + prefix=prefix, delimiter=self.delimiter, bucket_name=self.bucket_name + ) diff --git a/airflow/providers/amazon/aws/sensors/s3_key.py b/airflow/providers/amazon/aws/sensors/s3_key.py index 57cc0df581aa..deff11d00d1f 100644 --- a/airflow/providers/amazon/aws/sensors/s3_key.py +++ b/airflow/providers/amazon/aws/sensors/s3_key.py @@ -15,186 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import re -from typing import Callable, List, Optional, Union -from urllib.parse import urlparse +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.sensors.s3`.""" -from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.sensors.base import BaseSensorOperator +import warnings +from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor, S3KeySizeSensor # noqa -class S3KeySensor(BaseSensorOperator): - """ - Waits for a key (a file-like instance on S3) to be present in a S3 bucket. - S3 being a key/value it does not support folders. The path is just a key - a resource. - - :param bucket_key: The key being waited on. Supports full s3:// style url - or relative path from root level. When it's specified as a full s3:// - url, please leave bucket_name as `None`. - :type bucket_key: str - :param bucket_name: Name of the S3 bucket. Only needed when ``bucket_key`` - is not provided as a full s3:// url. - :type bucket_name: str - :param wildcard_match: whether the bucket_key should be interpreted as a - Unix wildcard pattern - :type wildcard_match: bool - :param aws_conn_id: a reference to the s3 connection - :type aws_conn_id: str - :param verify: Whether or not to verify SSL certificates for S3 connection. - By default SSL certificates are verified. - You can provide the following values: - - - ``False``: do not validate SSL certificates. SSL will still be used - (unless use_ssl is False), but SSL certificates will not be - verified. - - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. - You can specify this argument if you want to use a different - CA cert bundle than the one used by botocore. - :type verify: bool or str - """ - - template_fields = ('bucket_key', 'bucket_name') - - def __init__( - self, - *, - bucket_key: str, - bucket_name: Optional[str] = None, - wildcard_match: bool = False, - aws_conn_id: str = 'aws_default', - verify: Optional[Union[str, bool]] = None, - **kwargs, - ): - super().__init__(**kwargs) - - self.bucket_name = bucket_name - self.bucket_key = bucket_key - self.wildcard_match = wildcard_match - self.aws_conn_id = aws_conn_id - self.verify = verify - self.hook: Optional[S3Hook] = None - - def poke(self, context): - - if self.bucket_name is None: - parsed_url = urlparse(self.bucket_key) - if parsed_url.netloc == '': - raise AirflowException('If key is a relative path from root, please provide a bucket_name') - self.bucket_name = parsed_url.netloc - self.bucket_key = parsed_url.path.lstrip('/') - else: - parsed_url = urlparse(self.bucket_key) - if parsed_url.scheme != '' or parsed_url.netloc != '': - raise AirflowException( - 'If bucket_name is provided, bucket_key' - ' should be relative path from root' - ' level, rather than a full s3:// url' - ) - - self.log.info('Poking for key : s3://%s/%s', self.bucket_name, self.bucket_key) - if self.wildcard_match: - return self.get_hook().check_for_wildcard_key(self.bucket_key, self.bucket_name) - return self.get_hook().check_for_key(self.bucket_key, self.bucket_name) - - def get_hook(self) -> S3Hook: - """Create and return an S3Hook""" - if self.hook: - return self.hook - - self.hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) - return self.hook - - -class S3KeySizeSensor(S3KeySensor): - """ - Waits for a key (a file-like instance on S3) to be present and be more than - some size in a S3 bucket. - S3 being a key/value it does not support folders. The path is just a key - a resource. - - :param bucket_key: The key being waited on. Supports full s3:// style url - or relative path from root level. When it's specified as a full s3:// - url, please leave bucket_name as `None`. - :type bucket_key: str - :param bucket_name: Name of the S3 bucket. Only needed when ``bucket_key`` - is not provided as a full s3:// url. - :type bucket_name: str - :param wildcard_match: whether the bucket_key should be interpreted as a - Unix wildcard pattern - :type wildcard_match: bool - :param aws_conn_id: a reference to the s3 connection - :type aws_conn_id: str - :param verify: Whether or not to verify SSL certificates for S3 connection. - By default SSL certificates are verified. - You can provide the following values: - - - ``False``: do not validate SSL certificates. SSL will still be used - (unless use_ssl is False), but SSL certificates will not be - verified. - - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. - You can specify this argument if you want to use a different - CA cert bundle than the one used by botocore. - :type verify: bool or str - :type check_fn: Optional[Callable[..., bool]] - :param check_fn: Function that receives the list of the S3 objects, - and returns the boolean: - - ``True``: a certain criteria is met - - ``False``: the criteria isn't met - **Example**: Wait for any S3 object size more than 1 megabyte :: - - def check_fn(self, data: List) -> bool: - return any(f.get('Size', 0) > 1048576 for f in data if isinstance(f, dict)) - :type check_fn: Optional[Callable[..., bool]] - """ - - def __init__( - self, - *, - check_fn: Optional[Callable[..., bool]] = None, - **kwargs, - ): - super().__init__(**kwargs) - self.check_fn_user = check_fn - - def poke(self, context): - if super().poke(context=context) is False: - return False - - s3_objects = self.get_files(s3_hook=self.get_hook()) - if not s3_objects: - return False - check_fn = self.check_fn if self.check_fn_user is None else self.check_fn_user - return check_fn(s3_objects) - - def get_files(self, s3_hook: S3Hook, delimiter: Optional[str] = '/') -> List: - """Gets a list of files in the bucket""" - prefix = self.bucket_key - config = { - 'PageSize': None, - 'MaxItems': None, - } - if self.wildcard_match: - prefix = re.split(r'[\[\*\?]', self.bucket_key, 1)[0] - - paginator = s3_hook.get_conn().get_paginator('list_objects_v2') - response = paginator.paginate( - Bucket=self.bucket_name, Prefix=prefix, Delimiter=delimiter, PaginationConfig=config - ) - keys = [] - for page in response: - if 'Contents' in page: - _temp = [k for k in page['Contents'] if isinstance(k.get('Size', None), (int, float))] - keys = keys + _temp - return keys - - def check_fn(self, data: List, object_min_size: Optional[Union[int, float]] = 0) -> bool: - """Default function for checking that S3 Objects have size more than 0 - - :param data: List of the objects in S3 bucket. - :type data: list - :param object_min_size: Checks if the objects sizes are greater then this value. - :type object_min_size: int - """ - return all(f.get('Size', 0) > object_min_size for f in data if isinstance(f, dict)) +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.s3`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/sensors/s3_keys_unchanged.py b/airflow/providers/amazon/aws/sensors/s3_keys_unchanged.py index 374c16347066..792d29c46cfc 100644 --- a/airflow/providers/amazon/aws/sensors/s3_keys_unchanged.py +++ b/airflow/providers/amazon/aws/sensors/s3_keys_unchanged.py @@ -15,162 +15,14 @@ # specific language governing permissions and limitations # under the License. -import os -from datetime import datetime -from typing import Optional, Set, Union +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.sensors.s3`.""" -try: - from functools import cached_property -except ImportError: - from cached_property import cached_property +import warnings -from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.sensors.base import BaseSensorOperator, poke_mode_only +from airflow.providers.amazon.aws.sensors.s3 import S3KeysUnchangedSensor # noqa - -@poke_mode_only -class S3KeysUnchangedSensor(BaseSensorOperator): - """ - Checks for changes in the number of objects at prefix in AWS S3 - bucket and returns True if the inactivity period has passed with no - increase in the number of objects. Note, this sensor will not behave correctly - in reschedule mode, as the state of the listed objects in the S3 bucket will - be lost between rescheduled invocations. - - :param bucket_name: Name of the S3 bucket - :type bucket_name: str - :param prefix: The prefix being waited on. Relative path from bucket root level. - :type prefix: str - :param aws_conn_id: a reference to the s3 connection - :type aws_conn_id: str - :param verify: Whether or not to verify SSL certificates for S3 connection. - By default SSL certificates are verified. - You can provide the following values: - - - ``False``: do not validate SSL certificates. SSL will still be used - (unless use_ssl is False), but SSL certificates will not be - verified. - - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. - You can specify this argument if you want to use a different - CA cert bundle than the one used by botocore. - :type verify: Optional[Union[bool, str]] - :param inactivity_period: The total seconds of inactivity to designate - keys unchanged. Note, this mechanism is not real time and - this operator may not return until a poke_interval after this period - has passed with no additional objects sensed. - :type inactivity_period: float - :param min_objects: The minimum number of objects needed for keys unchanged - sensor to be considered valid. - :type min_objects: int - :param previous_objects: The set of object ids found during the last poke. - :type previous_objects: Optional[Set[str]] - :param allow_delete: Should this sensor consider objects being deleted - between pokes valid behavior. If true a warning message will be logged - when this happens. If false an error will be raised. - :type allow_delete: bool - """ - - template_fields = ('bucket_name', 'prefix') - - def __init__( - self, - *, - bucket_name: str, - prefix: str, - aws_conn_id: str = 'aws_default', - verify: Optional[Union[bool, str]] = None, - inactivity_period: float = 60 * 60, - min_objects: int = 1, - previous_objects: Optional[Set[str]] = None, - allow_delete: bool = True, - **kwargs, - ) -> None: - - super().__init__(**kwargs) - - self.bucket_name = bucket_name - self.prefix = prefix - if inactivity_period < 0: - raise ValueError("inactivity_period must be non-negative") - self.inactivity_period = inactivity_period - self.min_objects = min_objects - self.previous_objects = previous_objects or set() - self.inactivity_seconds = 0 - self.allow_delete = allow_delete - self.aws_conn_id = aws_conn_id - self.verify = verify - self.last_activity_time: Optional[datetime] = None - - @cached_property - def hook(self): - """Returns S3Hook.""" - return S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) - - def is_keys_unchanged(self, current_objects: Set[str]) -> bool: - """ - Checks whether new objects have been uploaded and the inactivity_period - has passed and updates the state of the sensor accordingly. - - :param current_objects: set of object ids in bucket during last poke. - :type current_objects: set[str] - """ - current_num_objects = len(current_objects) - if current_objects > self.previous_objects: - # When new objects arrived, reset the inactivity_seconds - # and update previous_objects for the next poke. - self.log.info( - "New objects found at %s, resetting last_activity_time.", - os.path.join(self.bucket_name, self.prefix), - ) - self.log.debug("New objects: %s", current_objects - self.previous_objects) - self.last_activity_time = datetime.now() - self.inactivity_seconds = 0 - self.previous_objects = current_objects - return False - - if self.previous_objects - current_objects: - # During the last poke interval objects were deleted. - if self.allow_delete: - deleted_objects = self.previous_objects - current_objects - self.previous_objects = current_objects - self.last_activity_time = datetime.now() - self.log.info( - "Objects were deleted during the last poke interval. Updating the " - "file counter and resetting last_activity_time:\n%s", - deleted_objects, - ) - return False - - raise AirflowException( - f"Illegal behavior: objects were deleted in" - f" {os.path.join(self.bucket_name, self.prefix)} between pokes." - ) - - if self.last_activity_time: - self.inactivity_seconds = int((datetime.now() - self.last_activity_time).total_seconds()) - else: - # Handles the first poke where last inactivity time is None. - self.last_activity_time = datetime.now() - self.inactivity_seconds = 0 - - if self.inactivity_seconds >= self.inactivity_period: - path = os.path.join(self.bucket_name, self.prefix) - - if current_num_objects >= self.min_objects: - self.log.info( - "SUCCESS: \nSensor found %s objects at %s.\n" - "Waited at least %s seconds, with no new objects uploaded.", - current_num_objects, - path, - self.inactivity_period, - ) - return True - - self.log.error("FAILURE: Inactivity Period passed, not enough objects found in %s", path) - - return False - return False - - def poke(self, context): - return self.is_keys_unchanged(set(self.hook.list_keys(self.bucket_name, prefix=self.prefix))) +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.s3`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/aws/sensors/s3_prefix.py b/airflow/providers/amazon/aws/sensors/s3_prefix.py index 83a4d0afbba0..3990e8774a67 100644 --- a/airflow/providers/amazon/aws/sensors/s3_prefix.py +++ b/airflow/providers/amazon/aws/sensors/s3_prefix.py @@ -15,77 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import Any, Dict, List, Optional, Union +"""This module is deprecated. Please use :mod:`airflow.providers.amazon.aws.sensors.s3`.""" -from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.sensors.base import BaseSensorOperator +import warnings +from airflow.providers.amazon.aws.sensors.s3 import S3PrefixSensor # noqa -class S3PrefixSensor(BaseSensorOperator): - """ - Waits for a prefix or all prefixes to exist. A prefix is the first part of a key, - thus enabling checking of constructs similar to glob ``airfl*`` or - SQL LIKE ``'airfl%'``. There is the possibility to precise a delimiter to - indicate the hierarchy or keys, meaning that the match will stop at that - delimiter. Current code accepts sane delimiters, i.e. characters that - are NOT special characters in the Python regex engine. - - :param bucket_name: Name of the S3 bucket - :type bucket_name: str - :param prefix: The prefix being waited on. Relative path from bucket root level. - :type prefix: str or list of str - :param delimiter: The delimiter intended to show hierarchy. - Defaults to '/'. - :type delimiter: str - :param aws_conn_id: a reference to the s3 connection - :type aws_conn_id: str - :param verify: Whether or not to verify SSL certificates for S3 connection. - By default SSL certificates are verified. - You can provide the following values: - - - ``False``: do not validate SSL certificates. SSL will still be used - (unless use_ssl is False), but SSL certificates will not be - verified. - - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. - You can specify this argument if you want to use a different - CA cert bundle than the one used by botocore. - :type verify: bool or str - """ - - template_fields = ('prefix', 'bucket_name') - - def __init__( - self, - *, - bucket_name: str, - prefix: Union[str, List[str]], - delimiter: str = '/', - aws_conn_id: str = 'aws_default', - verify: Optional[Union[str, bool]] = None, - **kwargs, - ): - super().__init__(**kwargs) - # Parse - self.bucket_name = bucket_name - self.prefix = [prefix] if isinstance(prefix, str) else prefix - self.delimiter = delimiter - self.aws_conn_id = aws_conn_id - self.verify = verify - self.hook: Optional[S3Hook] = None - - def poke(self, context: Dict[str, Any]): - self.log.info('Poking for prefix : %s in bucket s3://%s', self.prefix, self.bucket_name) - return all(self._check_for_prefix(prefix) for prefix in self.prefix) - - def get_hook(self) -> S3Hook: - """Create and return an S3Hook""" - if self.hook: - return self.hook - - self.hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) - return self.hook - - def _check_for_prefix(self, prefix: str) -> bool: - return self.get_hook().check_for_prefix( - prefix=prefix, delimiter=self.delimiter, bucket_name=self.bucket_name - ) +warnings.warn( + "This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.s3`.", + DeprecationWarning, + stacklevel=2, +) diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 12bdf831adcd..2f5b2c23b88c 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -224,6 +224,7 @@ operators: - airflow.providers.amazon.aws.operators.s3_file_transform - airflow.providers.amazon.aws.operators.s3_list - airflow.providers.amazon.aws.operators.s3_list_prefixes + - airflow.providers.amazon.aws.operators.s3 - integration-name: Amazon SageMaker python-modules: - airflow.providers.amazon.aws.operators.sagemaker_base @@ -293,6 +294,7 @@ sensors: - airflow.providers.amazon.aws.sensors.s3_key - airflow.providers.amazon.aws.sensors.s3_keys_unchanged - airflow.providers.amazon.aws.sensors.s3_prefix + - airflow.providers.amazon.aws.sensors.s3 - integration-name: Amazon SageMaker python-modules: - airflow.providers.amazon.aws.sensors.sagemaker_base diff --git a/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py b/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py index b8aed6a931cd..e3948f390dc4 100644 --- a/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py +++ b/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py @@ -21,7 +21,7 @@ from airflow import models from airflow.decorators import task from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator +from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py index bdea1876f551..8763b28e1f3d 100755 --- a/dev/provider_packages/prepare_provider_packages.py +++ b/dev/provider_packages/prepare_provider_packages.py @@ -2141,6 +2141,8 @@ def summarise_total_vs_bad_and_warnings(total: int, bad: int, warns: List[warnin "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.step_function`.", 'This module is deprecated. Please use `airflow.providers.amazon.aws.operators.ec2`.', 'This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.ec2`.', + "This module is deprecated. Please use `airflow.providers.amazon.aws.sensors.s3`.", + "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.s3`.", } diff --git a/docs/apache-airflow-providers-amazon/operators/s3.rst b/docs/apache-airflow-providers-amazon/operators/s3.rst index 62bd3370cf0f..29c58af8e207 100644 --- a/docs/apache-airflow-providers-amazon/operators/s3.rst +++ b/docs/apache-airflow-providers-amazon/operators/s3.rst @@ -29,19 +29,19 @@ Overview Airflow to Amazon Simple Storage Service (S3) integration provides several operators to create and interact with S3 buckets. - - :class:`~airflow.providers.amazon.aws.sensors.s3_key.S3KeySensor` - - :class:`~airflow.providers.amazon.aws.sensors.s3_key.S3KeySizeSensor` - - :class:`~airflow.providers.amazon.aws.sensors.s3_keys_unchanged.S3KeysUnchangedSensor` - - :class:`~airflow.providers.amazon.aws.sensors.s3_prefix.S3PrefixSensor` - - :class:`~airflow.providers.amazon.aws.operators.s3_bucket.S3CreateBucketOperator` - - :class:`~airflow.providers.amazon.aws.operators.s3_bucket.S3DeleteBucketOperator` - - :class:`~airflow.providers.amazon.aws.operators.s3_bucket_tagging.S3DeleteBucketTaggingOperator` - - :class:`~airflow.providers.amazon.aws.operators.s3_bucket_tagging.S3GetBucketTaggingOperator` - - :class:`~airflow.providers.amazon.aws.operators.s3_bucket_tagging.S3PutBucketTaggingOperator` - - :class:`~airflow.providers.amazon.aws.operators.s3_copy_object.S3CopyObjectOperator` - - :class:`~airflow.providers.amazon.aws.operators.s3_delete_objects.S3DeleteObjectsOperator` - - :class:`~airflow.providers.amazon.aws.operators.s3_file_transform.S3FileTransformOperator` - - :class:`~airflow.providers.amazon.aws.operators.s3_list.S3ListOperator` + - :class:`~airflow.providers.amazon.aws.sensors.s3.S3KeySensor` + - :class:`~airflow.providers.amazon.aws.sensors.s3.S3KeySizeSensor` + - :class:`~airflow.providers.amazon.aws.sensors.s3.S3KeysUnchangedSensor` + - :class:`~airflow.providers.amazon.aws.sensors.s3.S3PrefixSensor` + - :class:`~airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator` + - :class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketOperator` + - :class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketTaggingOperator` + - :class:`~airflow.providers.amazon.aws.operators.s3.S3GetBucketTaggingOperator` + - :class:`~airflow.providers.amazon.aws.operators.s3.S3PutBucketTaggingOperator` + - :class:`~airflow.providers.amazon.aws.operators.s3.S3CopyObjectOperator` + - :class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteObjectsOperator` + - :class:`~airflow.providers.amazon.aws.operators.s3.S3FileTransformOperator` + - :class:`~airflow.providers.amazon.aws.operators.s3.S3ListOperator` Two example_dags are provided which showcase these operators in action. diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py index 5efb57367f15..1663ec89ef16 100644 --- a/tests/deprecated_classes.py +++ b/tests/deprecated_classes.py @@ -1056,17 +1056,21 @@ 'airflow.contrib.operators.emr_terminate_job_flow_operator.EmrTerminateJobFlowOperator', ), ( - 'airflow.providers.amazon.aws.operators.s3_copy_object.S3CopyObjectOperator', + 'airflow.providers.amazon.aws.operators.s3.S3CopyObjectOperator', 'airflow.contrib.operators.s3_copy_object_operator.S3CopyObjectOperator', ), ( - 'airflow.providers.amazon.aws.operators.s3_delete_objects.S3DeleteObjectsOperator', + 'airflow.providers.amazon.aws.operators.s3.S3DeleteObjectsOperator', 'airflow.contrib.operators.s3_delete_objects_operator.S3DeleteObjectsOperator', ), ( - 'airflow.providers.amazon.aws.operators.s3_list.S3ListOperator', + 'airflow.providers.amazon.aws.operators.s3.S3ListOperator', 'airflow.contrib.operators.s3_list_operator.S3ListOperator', ), + ( + 'airflow.providers.amazon.aws.operators.s3.S3FileTransformOperator', + 'airflow.operators.s3_file_transform_operator.S3FileTransformOperator', + ), ( 'airflow.providers.amazon.aws.operators.sagemaker_base.SageMakerBaseOperator', 'airflow.contrib.operators.sagemaker_base_operator.SageMakerBaseOperator', @@ -1351,6 +1355,46 @@ "airflow.providers.amazon.aws.operators.ec2.EC2StopInstanceOperator", "airflow.providers.amazon.aws.operators.ec2_stop_instance.EC2StopInstanceOperator", ), + ( + "airflow.providers.amazon.aws.operators.s3.S3FileTransformOperator", + "airflow.providers.amazon.aws.operators.s3_file_transform.S3FileTransformOperator", + ), + ( + "airflow.providers.amazon.aws.operators.s3.S3ListOperator", + "airflow.providers.amazon.aws.operators.s3_list.S3ListOperator", + ), + ( + "airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator", + "airflow.providers.amazon.aws.operators.s3_bucket.S3CreateBucketOperator", + ), + ( + "airflow.providers.amazon.aws.operators.s3.S3DeleteBucketOperator", + "airflow.providers.amazon.aws.operators.s3_bucket.S3DeleteBucketOperator", + ), + ( + "airflow.providers.amazon.aws.operators.s3.S3GetBucketTaggingOperator", + "airflow.providers.amazon.aws.operators.s3_bucket_tagging.S3GetBucketTaggingOperator", + ), + ( + "airflow.providers.amazon.aws.operators.s3.S3PutBucketTaggingOperator", + "airflow.providers.amazon.aws.operators.s3_bucket_tagging.S3PutBucketTaggingOperator", + ), + ( + "airflow.providers.amazon.aws.operators.s3.S3DeleteBucketTaggingOperator", + "airflow.providers.amazon.aws.operators.s3_bucket_tagging.S3DeleteBucketTaggingOperator", + ), + ( + "airflow.providers.amazon.aws.operators.s3.S3CopyObjectOperator", + "airflow.providers.amazon.aws.operators.s3_copy_object.S3CopyObjectOperator", + ), + ( + "airflow.providers.amazon.aws.operators.s3.S3DeleteObjectsOperator", + "airflow.providers.amazon.aws.operators.s3_delete_objects.S3DeleteObjectsOperator", + ), + ( + "airflow.providers.amazon.aws.operators.s3.S3ListPrefixesOperator", + "airflow.providers.amazon.aws.operators.s3_list_prefixes.S3ListPrefixesOperator", + ), ] SECRETS = [ @@ -1514,10 +1558,6 @@ 'airflow.providers.amazon.aws.sensors.sagemaker_tuning.SageMakerTuningSensor', 'airflow.contrib.sensors.sagemaker_tuning_sensor.SageMakerTuningSensor', ), - ( - 'airflow.providers.amazon.aws.operators.s3_file_transform.S3FileTransformOperator', - 'airflow.operators.s3_file_transform_operator.S3FileTransformOperator', - ), ( 'airflow.providers.amazon.aws.operators.step_function.StepFunctionStartExecutionOperator', 'airflow.providers.amazon.aws.operators.step_function_start_execution' @@ -1529,11 +1569,11 @@ '.StepFunctionGetExecutionOutputOperator', ), ( - 'airflow.providers.amazon.aws.sensors.s3_key.S3KeySensor', + 'airflow.providers.amazon.aws.sensors.s3.S3KeySensor', 'airflow.sensors.s3_key_sensor.S3KeySensor', ), ( - 'airflow.providers.amazon.aws.sensors.s3_prefix.S3PrefixSensor', + 'airflow.providers.amazon.aws.sensors.s3.S3PrefixSensor', 'airflow.sensors.s3_prefix_sensor.S3PrefixSensor', ), ( @@ -1612,6 +1652,22 @@ 'airflow.providers.amazon.aws.sensors.ec2.EC2InstanceStateSensor', 'airflow.providers.amazon.aws.sensors.ec2_instance_state.EC2InstanceStateSensor', ), + ( + 'airflow.providers.amazon.aws.sensors.s3.S3KeySensor', + 'airflow.providers.amazon.aws.sensors.s3_key.S3KeySensor', + ), + ( + 'airflow.providers.amazon.aws.sensors.s3.S3KeySizeSensor', + 'airflow.providers.amazon.aws.sensors.s3_key.S3KeySizeSensor', + ), + ( + 'airflow.providers.amazon.aws.sensors.s3.S3KeysUnchangedSensor', + 'airflow.providers.amazon.aws.sensors.s3_keys_unchanged.S3KeysUnchangedSensor', + ), + ( + "airflow.providers.amazon.aws.sensors.s3.S3PrefixSensor", + "airflow.providers.amazon.aws.sensors.s3_prefix.S3PrefixSensor", + ), ] TRANSFERS = [ diff --git a/tests/providers/amazon/aws/operators/test_s3_bucket.py b/tests/providers/amazon/aws/operators/test_s3_bucket.py index e501125ae90b..dd7e6421f7e3 100644 --- a/tests/providers/amazon/aws/operators/test_s3_bucket.py +++ b/tests/providers/amazon/aws/operators/test_s3_bucket.py @@ -22,7 +22,7 @@ from moto import mock_s3 from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator +from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator BUCKET_NAME = os.environ.get("BUCKET_NAME", "test-airflow-bucket") TASK_ID = os.environ.get("TASK_ID", "test-s3-operator") diff --git a/tests/providers/amazon/aws/operators/test_s3_bucket_tagging.py b/tests/providers/amazon/aws/operators/test_s3_bucket_tagging.py index aa679b71b63a..ab6ea36280e5 100644 --- a/tests/providers/amazon/aws/operators/test_s3_bucket_tagging.py +++ b/tests/providers/amazon/aws/operators/test_s3_bucket_tagging.py @@ -22,7 +22,7 @@ from moto import mock_s3 from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.amazon.aws.operators.s3_bucket_tagging import ( +from airflow.providers.amazon.aws.operators.s3 import ( S3DeleteBucketTaggingOperator, S3GetBucketTaggingOperator, S3PutBucketTaggingOperator, diff --git a/tests/providers/amazon/aws/operators/test_s3_copy_object.py b/tests/providers/amazon/aws/operators/test_s3_copy_object.py index ac810afa5d09..a2f4f9eddbaa 100644 --- a/tests/providers/amazon/aws/operators/test_s3_copy_object.py +++ b/tests/providers/amazon/aws/operators/test_s3_copy_object.py @@ -22,7 +22,7 @@ import boto3 from moto import mock_s3 -from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator +from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator class TestS3CopyObjectOperator(unittest.TestCase): diff --git a/tests/providers/amazon/aws/operators/test_s3_delete_objects.py b/tests/providers/amazon/aws/operators/test_s3_delete_objects.py index e9e93d8e2e7f..958b5df16189 100644 --- a/tests/providers/amazon/aws/operators/test_s3_delete_objects.py +++ b/tests/providers/amazon/aws/operators/test_s3_delete_objects.py @@ -22,7 +22,7 @@ import boto3 from moto import mock_s3 -from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator +from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator class TestS3DeleteObjectsOperator(unittest.TestCase): diff --git a/tests/providers/amazon/aws/operators/test_s3_file_transform.py b/tests/providers/amazon/aws/operators/test_s3_file_transform.py index 3b749438cb71..cd404a772f51 100644 --- a/tests/providers/amazon/aws/operators/test_s3_file_transform.py +++ b/tests/providers/amazon/aws/operators/test_s3_file_transform.py @@ -31,7 +31,7 @@ from moto import mock_s3 from airflow.exceptions import AirflowException -from airflow.providers.amazon.aws.operators.s3_file_transform import S3FileTransformOperator +from airflow.providers.amazon.aws.operators.s3 import S3FileTransformOperator class TestS3FileTransformOperator(unittest.TestCase): diff --git a/tests/providers/amazon/aws/operators/test_s3_list.py b/tests/providers/amazon/aws/operators/test_s3_list.py index 249b97140651..7a1e6a73e729 100644 --- a/tests/providers/amazon/aws/operators/test_s3_list.py +++ b/tests/providers/amazon/aws/operators/test_s3_list.py @@ -19,7 +19,7 @@ import unittest from unittest import mock -from airflow.providers.amazon.aws.operators.s3_list import S3ListOperator +from airflow.providers.amazon.aws.operators.s3 import S3ListOperator TASK_ID = 'test-s3-list-operator' BUCKET = 'test-bucket' @@ -29,7 +29,7 @@ class TestS3ListOperator(unittest.TestCase): - @mock.patch('airflow.providers.amazon.aws.operators.s3_list.S3Hook') + @mock.patch('airflow.providers.amazon.aws.operators.s3.S3Hook') def test_execute(self, mock_hook): mock_hook.return_value.list_keys.return_value = MOCK_FILES diff --git a/tests/providers/amazon/aws/operators/test_s3_list_prefixes.py b/tests/providers/amazon/aws/operators/test_s3_list_prefixes.py index 8e4ab19484b7..a7603e5a6a12 100644 --- a/tests/providers/amazon/aws/operators/test_s3_list_prefixes.py +++ b/tests/providers/amazon/aws/operators/test_s3_list_prefixes.py @@ -19,7 +19,7 @@ import unittest from unittest import mock -from airflow.providers.amazon.aws.operators.s3_list_prefixes import S3ListPrefixesOperator +from airflow.providers.amazon.aws.operators.s3 import S3ListPrefixesOperator TASK_ID = 'test-s3-list-prefixes-operator' BUCKET = 'test-bucket' @@ -29,7 +29,7 @@ class TestS3ListOperator(unittest.TestCase): - @mock.patch('airflow.providers.amazon.aws.operators.s3_list_prefixes.S3Hook') + @mock.patch('airflow.providers.amazon.aws.operators.s3.S3Hook') def test_execute(self, mock_hook): mock_hook.return_value.list_prefixes.return_value = MOCK_SUBFOLDERS diff --git a/tests/providers/amazon/aws/sensors/test_s3_key.py b/tests/providers/amazon/aws/sensors/test_s3_key.py index c4714fdb3aec..ca441246060d 100644 --- a/tests/providers/amazon/aws/sensors/test_s3_key.py +++ b/tests/providers/amazon/aws/sensors/test_s3_key.py @@ -25,7 +25,7 @@ from airflow.exceptions import AirflowException from airflow.models import DAG, DagRun, TaskInstance from airflow.models.variable import Variable -from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor, S3KeySizeSensor +from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor, S3KeySizeSensor from airflow.utils import timezone @@ -58,7 +58,7 @@ def test_bucket_name_provided_and_bucket_key_is_s3_url(self): ['key', 'bucket', 'key', 'bucket'], ] ) - @mock.patch('airflow.providers.amazon.aws.sensors.s3_key.S3Hook') + @mock.patch('airflow.providers.amazon.aws.sensors.s3.S3Hook') def test_parse_bucket_key(self, key, bucket, parsed_key, parsed_bucket, mock_hook): mock_hook.return_value.check_for_key.return_value = False @@ -73,7 +73,7 @@ def test_parse_bucket_key(self, key, bucket, parsed_key, parsed_bucket, mock_hoo assert op.bucket_key == parsed_key assert op.bucket_name == parsed_bucket - @mock.patch('airflow.providers.amazon.aws.sensors.s3_key.S3Hook') + @mock.patch('airflow.providers.amazon.aws.sensors.s3.S3Hook') def test_parse_bucket_key_from_jinja(self, mock_hook): mock_hook.return_value.check_for_key.return_value = False @@ -100,7 +100,7 @@ def test_parse_bucket_key_from_jinja(self, mock_hook): assert op.bucket_key == "key" assert op.bucket_name == "bucket" - @mock.patch('airflow.providers.amazon.aws.sensors.s3_key.S3Hook') + @mock.patch('airflow.providers.amazon.aws.sensors.s3.S3Hook') def test_poke(self, mock_hook): op = S3KeySensor(task_id='s3_key_sensor', bucket_key='s3://test_bucket/file') @@ -112,7 +112,7 @@ def test_poke(self, mock_hook): mock_hook.return_value.check_for_key.return_value = True assert op.poke(None) - @mock.patch('airflow.providers.amazon.aws.sensors.s3_key.S3Hook') + @mock.patch('airflow.providers.amazon.aws.sensors.s3.S3Hook') def test_poke_wildcard(self, mock_hook): op = S3KeySensor(task_id='s3_key_sensor', bucket_key='s3://test_bucket/file', wildcard_match=True) @@ -126,14 +126,14 @@ def test_poke_wildcard(self, mock_hook): class TestS3KeySizeSensor(unittest.TestCase): - @mock.patch('airflow.providers.amazon.aws.sensors.s3_key.S3Hook.check_for_key', return_value=False) + @mock.patch('airflow.providers.amazon.aws.sensors.s3.S3Hook.check_for_key', return_value=False) def test_poke_check_for_key_false(self, mock_check_for_key): op = S3KeySizeSensor(task_id='s3_key_sensor', bucket_key='s3://test_bucket/file') assert not op.poke(None) mock_check_for_key.assert_called_once_with(op.bucket_key, op.bucket_name) - @mock.patch('airflow.providers.amazon.aws.sensors.s3_key.S3KeySizeSensor.get_files', return_value=[]) - @mock.patch('airflow.providers.amazon.aws.sensors.s3_key.S3Hook.check_for_key', return_value=True) + @mock.patch('airflow.providers.amazon.aws.sensors.s3.S3KeySizeSensor.get_files', return_value=[]) + @mock.patch('airflow.providers.amazon.aws.sensors.s3.S3Hook.check_for_key', return_value=True) def test_poke_get_files_false(self, mock_check_for_key, mock_get_files): op = S3KeySizeSensor(task_id='s3_key_sensor', bucket_key='s3://test_bucket/file') assert not op.poke(None) @@ -150,7 +150,7 @@ def test_poke_get_files_false(self, mock_check_for_key, mock_get_files): [{"Contents": [{"Size": 10}, {"Size": 10}]}, True], ] ) - @mock.patch('airflow.providers.amazon.aws.sensors.s3_key.S3Hook') + @mock.patch('airflow.providers.amazon.aws.sensors.s3.S3Hook') def test_poke(self, paginate_return_value, poke_return_value, mock_hook): op = S3KeySizeSensor(task_id='s3_key_sensor', bucket_key='s3://test_bucket/file') @@ -166,8 +166,8 @@ def test_poke(self, paginate_return_value, poke_return_value, mock_hook): assert op.poke(None) is poke_return_value mock_check_for_key.assert_called_once_with(op.bucket_key, op.bucket_name) - @mock.patch('airflow.providers.amazon.aws.sensors.s3_key.S3KeySizeSensor.get_files', return_value=[]) - @mock.patch('airflow.providers.amazon.aws.sensors.s3_key.S3Hook') + @mock.patch('airflow.providers.amazon.aws.sensors.s3.S3KeySizeSensor.get_files', return_value=[]) + @mock.patch('airflow.providers.amazon.aws.sensors.s3.S3Hook') def test_poke_wildcard(self, mock_hook, mock_get_files): op = S3KeySizeSensor(task_id='s3_key_sensor', bucket_key='s3://test_bucket/file', wildcard_match=True) diff --git a/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py b/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py index fc83b38d1256..1c1d85242b33 100644 --- a/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py +++ b/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py @@ -24,7 +24,7 @@ from parameterized import parameterized from airflow.models.dag import DAG, AirflowException -from airflow.providers.amazon.aws.sensors.s3_keys_unchanged import S3KeysUnchangedSensor +from airflow.providers.amazon.aws.sensors.s3 import S3KeysUnchangedSensor TEST_DAG_ID = 'unit_tests_aws_sensor' DEFAULT_DATE = datetime(2015, 1, 1) @@ -100,7 +100,7 @@ def test_key_changes(self, current_objects, expected_returns, inactivity_periods assert self.sensor.inactivity_seconds == inactivity_periods[2] @freeze_time(DEFAULT_DATE, auto_tick_seconds=10) - @mock.patch('airflow.providers.amazon.aws.sensors.s3_keys_unchanged.S3Hook') + @mock.patch('airflow.providers.amazon.aws.sensors.s3.S3Hook') def test_poke_succeeds_on_upload_complete(self, mock_hook): mock_hook.return_value.list_keys.return_value = {'a'} assert not self.sensor.poke(dict()) diff --git a/tests/providers/amazon/aws/sensors/test_s3_prefix.py b/tests/providers/amazon/aws/sensors/test_s3_prefix.py index 0859a0ffeb4c..d0124343d517 100644 --- a/tests/providers/amazon/aws/sensors/test_s3_prefix.py +++ b/tests/providers/amazon/aws/sensors/test_s3_prefix.py @@ -19,10 +19,10 @@ from unittest import mock from unittest.mock import call -from airflow.providers.amazon.aws.sensors.s3_prefix import S3PrefixSensor +from airflow.providers.amazon.aws.sensors.s3 import S3PrefixSensor -@mock.patch('airflow.providers.amazon.aws.sensors.s3_prefix.S3Hook') +@mock.patch('airflow.providers.amazon.aws.sensors.s3.S3Hook') def test_poke(mock_hook): op = S3PrefixSensor(task_id='s3_prefix', bucket_name='bucket', prefix='prefix') @@ -36,7 +36,7 @@ def test_poke(mock_hook): assert op.poke({}) -@mock.patch('airflow.providers.amazon.aws.sensors.s3_prefix.S3Hook') +@mock.patch('airflow.providers.amazon.aws.sensors.s3.S3Hook') def test_poke_should_check_multiple_prefixes(mock_hook): op = S3PrefixSensor(task_id='s3_prefix', bucket_name='bucket', prefix=['prefix1', 'prefix2']) diff --git a/tests/providers/google/cloud/transfers/test_s3_to_gcs.py b/tests/providers/google/cloud/transfers/test_s3_to_gcs.py index 471b903b7b94..d83179231e9a 100644 --- a/tests/providers/google/cloud/transfers/test_s3_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_s3_to_gcs.py @@ -55,7 +55,7 @@ def test_init(self): assert operator.google_impersonation_chain == IMPERSONATION_CHAIN @mock.patch('airflow.providers.google.cloud.transfers.s3_to_gcs.S3Hook') - @mock.patch('airflow.providers.amazon.aws.operators.s3_list.S3Hook') + @mock.patch('airflow.providers.amazon.aws.operators.s3.S3Hook') @mock.patch('airflow.providers.google.cloud.transfers.s3_to_gcs.GCSHook') def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook): """Test the execute function when the run is successful.""" @@ -95,7 +95,7 @@ def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook): assert sorted(MOCK_FILES) == sorted(uploaded_files) @mock.patch('airflow.providers.google.cloud.transfers.s3_to_gcs.S3Hook') - @mock.patch('airflow.providers.amazon.aws.operators.s3_list.S3Hook') + @mock.patch('airflow.providers.amazon.aws.operators.s3.S3Hook') @mock.patch('airflow.providers.google.cloud.transfers.s3_to_gcs.GCSHook') def test_execute_with_gzip(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook): """Test the execute function when the run is successful."""