Skip to content
Permalink
Browse files
Add sample dag and doc for S3ListPrefixesOperator (#23448)
* Add sample dag and doc for S3ListPrefixesOperator

* Fix static checks
  • Loading branch information
vincbeck committed May 9, 2022
1 parent e63dbdc commit d21e49dfda3fa8432ad995c62620d89d1bb3c217
Showing 3 changed files with 46 additions and 13 deletions.
@@ -31,6 +31,7 @@
S3FileTransformOperator,
S3GetBucketTaggingOperator,
S3ListOperator,
S3ListPrefixesOperator,
S3PutBucketTaggingOperator,
)
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor, S3KeysUnchangedSensor
@@ -42,6 +43,7 @@
# Empty string prefix refers to the bucket root
# See what prefix is here https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html
PREFIX = os.environ.get('PREFIX', '')
DELIMITER = os.environ.get('DELIMITER', '/')
TAG_KEY = os.environ.get('TAG_KEY', 'test-s3-bucket-tagging-key')
TAG_VALUE = os.environ.get('TAG_VALUE', 'test-s3-bucket-tagging-value')
DATA = os.environ.get(
@@ -107,7 +109,7 @@ def check_fn(files: List) -> bool:
# [END howto_operator_s3_delete_bucket_tagging]

# [START howto_operator_s3_create_object]
s3_create_object = S3CreateObjectOperator(
create_object = S3CreateObjectOperator(
task_id="s3_create_object",
s3_bucket=BUCKET_NAME,
s3_key=KEY,
@@ -116,6 +118,15 @@ def check_fn(files: List) -> bool:
)
# [END howto_operator_s3_create_object]

# [START howto_operator_s3_list_prefixes]
list_prefixes = S3ListPrefixesOperator(
task_id="s3_list_prefix_operator",
bucket=BUCKET_NAME,
prefix=PREFIX,
delimiter=DELIMITER,
)
# [END howto_operator_s3_list_prefixes]

# [START howto_operator_s3_list]
list_keys = S3ListOperator(
task_id="s3_list_operator",
@@ -126,7 +137,7 @@ def check_fn(files: List) -> bool:

# [START howto_sensor_s3_key_single_key]
# Check if a file exists
s3_sensor_one_key = S3KeySensor(
sensor_one_key = S3KeySensor(
task_id="s3_sensor_one_key",
bucket_name=BUCKET_NAME,
bucket_key=KEY,
@@ -135,7 +146,7 @@ def check_fn(files: List) -> bool:

# [START howto_sensor_s3_key_multiple_keys]
# Check if both files exist
s3_sensor_two_keys = S3KeySensor(
sensor_two_keys = S3KeySensor(
task_id="s3_sensor_two_keys",
bucket_name=BUCKET_NAME,
bucket_key=[KEY, KEY_2],
@@ -144,7 +155,7 @@ def check_fn(files: List) -> bool:

# [START howto_sensor_s3_key_function]
# Check if a file exists and match a certain pattern defined in check_fn
s3_sensor_key_function = S3KeySensor(
sensor_key_with_function = S3KeySensor(
task_id="s3_sensor_key_function",
bucket_name=BUCKET_NAME,
bucket_key=KEY,
@@ -153,7 +164,7 @@ def check_fn(files: List) -> bool:
# [END howto_sensor_s3_key_function]

# [START howto_sensor_s3_keys_unchanged]
s3_sensor_keys_unchanged = S3KeysUnchangedSensor(
sensor_keys_unchanged = S3KeysUnchangedSensor(
task_id="s3_sensor_one_key_size",
bucket_name=BUCKET_NAME_2,
prefix=PREFIX,
@@ -162,7 +173,7 @@ def check_fn(files: List) -> bool:
# [END howto_sensor_s3_keys_unchanged]

# [START howto_operator_s3_copy_object]
s3_copy_object = S3CopyObjectOperator(
copy_object = S3CopyObjectOperator(
task_id="s3_copy_object",
source_bucket_name=BUCKET_NAME,
dest_bucket_name=BUCKET_NAME_2,
@@ -172,7 +183,7 @@ def check_fn(files: List) -> bool:
# [END howto_operator_s3_copy_object]

# [START howto_operator_s3_file_transform]
s3_file_transform = S3FileTransformOperator(
transforms_file = S3FileTransformOperator(
task_id="s3_file_transform",
source_s3_key=f's3://{BUCKET_NAME}/{KEY}',
dest_s3_key=f's3://{BUCKET_NAME_2}/{KEY_2}',
@@ -183,7 +194,7 @@ def check_fn(files: List) -> bool:
# [END howto_operator_s3_file_transform]

# [START howto_operator_s3_delete_objects]
s3_delete_objects = S3DeleteObjectsOperator(
delete_objects = S3DeleteObjectsOperator(
task_id="s3_delete_objects",
bucket=BUCKET_NAME_2,
keys=KEY_2,
@@ -201,11 +212,13 @@ def check_fn(files: List) -> bool:
put_tagging,
get_tagging,
delete_tagging,
s3_create_object,
create_object,
list_prefixes,
list_keys,
[s3_sensor_one_key, s3_sensor_two_keys, s3_sensor_key_function],
s3_copy_object,
s3_sensor_keys_unchanged,
s3_delete_objects,
[sensor_one_key, sensor_two_keys, sensor_key_with_function],
copy_object,
transforms_file,
sensor_keys_unchanged,
delete_objects,
delete_bucket,
)
@@ -693,6 +693,10 @@ class S3ListPrefixesOperator(BaseOperator):
This operator returns a python list with the name of all subfolders which
can be used by `xcom` in the downstream task.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:S3ListPrefixesOperator`
:param bucket: The S3 bucket where to find the subfolders. (templated)
:param prefix: Prefix string to filter the subfolders whose name begin with
such prefix. (templated)
@@ -196,6 +196,22 @@ To create a new (or replace) Amazon S3 object you can use
:start-after: [START howto_operator_s3_create_object]
:end-before: [END howto_operator_s3_create_object]

.. _howto/operator:S3ListPrefixesOperator:

List Amazon S3 prefixes
-----------------------

To list all Amazon S3 prefixes within an Amazon S3 bucket you can use
:class:`~airflow.providers.amazon.aws.operators.s3.S3ListPrefixesOperator`.
See `here <https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html>`__
for more information about Amazon S3 prefixes.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
:language: python
:dedent: 4
:start-after: [START howto_operator_s3_list_prefixes]
:end-before: [END howto_operator_s3_list_prefixes]

.. _howto/operator:S3ListOperator:

List Amazon S3 objects

0 comments on commit d21e49d

Please sign in to comment.