Skip to content

Conversation

@bentorb
Copy link

@bentorb bentorb commented Dec 4, 2025

Add S3CopyPrefixOperator for copying objects by prefix

Description

This PR introduces a new S3CopyPrefixOperator that enables copying all S3 objects under a specified prefix from a source bucket to a destination bucket. This operator fills a gap in the current S3 operators by providing prefix-based bulk copy functionality.

What does this operator do?

• Copies all objects matching a specified prefix from source to destination S3 bucket
• Supports cross-bucket
• Handles large datasets through pagination
• Provides configurable error handling (continue on failure or stop on first error)
• Integrates with OpenLineage for data lineage tracking
• Supports Airflow templating for dynamic parameter values

Why is this needed?

Currently, Airflow's S3 operators allow copying individual objects. For use cases involving copying entire "directory" structures or large numbers of objects sharing a common prefix, users must implement custom solutions or use multiple operator instances.
This operator provides a native, efficient solution for prefix-based bulk operations.

Key Features

Pagination Support: Automatically handles large object lists using S3's pagination
Error Handling: Configurable continue_on_failure parameter for resilient operations
Template Fields: All key parameters support Jinja templating
OpenLineage Integration: Automatic data lineage tracking for copied objects
Standard Exception Handling: Uses RuntimeError following new Airflow guidelines

Implementation Details

Base Class: Based on S3CopyObjectOperator for consistency
Dependencies: Uses existing S3Hook and AWS connection infrastructure
Documentation: Updated providers/amazon/docs/operators/s3/s3.rst with operator documentation
Error Handling: Follows new Airflow guidelines using standard Python exceptions

Testing

Includes 14 new unit tests covering:
• Basic functionality and successful copying
• Error scenarios and exception handling
• Pagination configuration
• Continue on failure behavior
• OpenLineage integration
• Template field functionality

System test integration in tests/system/providers/amazon/aws/example_s3.py
All tests pass in Breeze testing environment

Usage Example

copy_prefix = S3CopyPrefixOperator(
    task_id='copy_data_files',                                                                                                                                                                                                                                                       
    source_bucket_name='source-bucket',                                                                                                                                                                                                                                              
    source_bucket_key='data/2023/',                                                                                                                                                                                                                                                  
    dest_bucket_name='dest-bucket',                                                                                                                                                                                                                                                  
    dest_bucket_key='archive/data/2023/',                                                                                                                                                                                                                                            
    continue_on_failure=True,                                                                                                                                                                                                                                                        
    aws_conn_id='aws_default'                                                                                                                                                                                                                                                        
)                                                                                                                                                                                                                                                                                    

Checklist

• [x] Tests included (14 comprehensive unit tests)
• [x] Documentation updated
• [x] Code follows project coding standards
• [x] All static code checks pass
• [x] Apache license headers added
• [x] PR is focused on single feature
• [x] Local tests pass
• [x] No unrelated changes included

@bentorb bentorb requested a review from o-nikolas as a code owner December 4, 2025 13:07
@boring-cyborg
Copy link

boring-cyborg bot commented Dec 4, 2025

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

Copy link
Contributor

@vincbeck vincbeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty cool!

Comment on lines +404 to +407
:param acl_policy: String specifying the canned ACL policy for the file being
uploaded to the S3 bucket.
:param meta_data_directive: Whether to `COPY` the metadata from the source object or `REPLACE` it with
metadata that's provided in the request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minor nit: move these two above aws_conn_id so that it matches __init__

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Comment on lines +411 to +414
"source_bucket_prefix",
"dest_bucket_prefix",
"source_bucket_name",
"dest_bucket_name",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not template the rest?

Copy link
Author

@bentorb bentorb Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new operator is based on the existing S3CopyObjectOperator, which only templates these four fields (or their equivalent). I also introduced page_size and continue_on_failure, but I don't think templating is applicable to these two.

If you have specific fields in mind that you think are missing, please feel free to propose and I'd be happy to add them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the two new fields are worth templating. There is usually very little rhyme or reason for why older operators template some fields vs others. So it's not necessarily worth copying that.

self.dest_bucket_name, self.dest_bucket_prefix, "dest_bucket_name", "dest_bucket_prefix"
)

# Get paginator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think comments like this are superfluous (often if you're using an AI agent for coding they add many many comments), and this one seems to be on the wrong line also. But I would just drop it entirely.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

# [END howto_operator_s3_copy_object]

# [START howto_operator_s3_copy_prefix]
copy_prefix = S3CopyPrefixOperator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the system test! By chance did you run the dag and see if it's still working?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is working!

It should be omitted when `dest_bucket_prefix` is provided as a full s3:// url.
:param page_size: Number of objects to list per page when paginating through S3 objects.
Low values result in more API calls, high values increase memory usage.
Between 1 and 1000, setting it to 0 results in no objects copied. Default is 1000.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the operator __init__ check if zero was provided? Not transferring anything seems like a weird silent failure. 0 often represents unbounded (and has in the past in Airflow for other configs), so I can see people making that mistake. I think it's worth detecting that situation and throwing an exception instead of just silently doing nothing.

Copy link
Author

@bentorb bentorb Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial thought for allowing a value of 0 is that it can potentially be used as a dynamic mechanism to 'disable' the task without having to modify the DAG. For instance, in the event of facing an incident with corrupted data, one can quickly stop copying files by setting this parameter to 0, without having to deal with also changing unit-tests, etc. This is just an example, I can see other scenarios in which it might be useful.

Furthermore, I feel like in this context, a value of 0 is relatively self-explanatory. I also think page_size is not a 'trivial' parameter, and anyone providing a custom value (instead of using the default) would have a minimum understanding on how to use it. That being said, I didn't know that 0 often represents unbounded in Airflow, so I can totally understand that confusions might happen.

Personally, I don't have a strong opinion about this, so I'm happy to go with your approach.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also just remove page_size completely, see data points in the previous comment. Tagging @vincbeck and @ferruzzi for further opinions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of disabling a task is interesting, but not something that is very common in airflow, so I'm not sure an operator would think of it in the rare case that it would be useful . Plus it would also need a deploy to update dag code. I think overall it's worth simplifying and just catching that case and not allowing it

self.log.info("Successfully copied %s object(s)", copied_object_count)

if failed_object_count > 0:
raise RuntimeError(f"Failed to copy {failed_object_count} object(s)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My expectation would be that continue_on_failure would not fail the task, perhaps just log at ERROR or WARNING that some copy operations failed. But I also see the exception approach as being a "you can't miss this" communication mechanism that some things failed.

Curious what others thing CC @vincbeck @ferruzzi

Copy link
Author

@bentorb bentorb Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'm not a big fan of silent failures, so my direction with continue_on_failure was more "do I want to copy as much data as possible in the event of a failure or just stop immediately?". In both cases, there has been an error so the operator should fail, it's just a matter of when. I think there are valid scenarios in which incomplete data is better than no data, but one still wants to be alerted about issues.

Of course I do understand your interpretation and I think it also makes sense. Would be great to hear other opinions as well @vincbeck @ferruzzi .

It should be omitted when `source_bucket_prefix` is provided as a full s3:// url.
:param dest_bucket_name: Name of the S3 bucket to where the objects are copied. (templated)
It should be omitted when `dest_bucket_prefix` is provided as a full s3:// url.
:param page_size: Number of objects to list per page when paginating through S3 objects.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something that many folks actually ever modify? Is there any benefit to using anything of than what S3 defaults to? It would simplify your src and test code to just not include it here unless it's very common or a user you're working backwards from is asking for it.

Copy link
Author

@bentorb bentorb Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good question/point. The maximum length of an S3 object key is 1,024 bytes (UTF-8). At the same time, both the default and maximum value for page_size is 1000. This means that in theory, the raw data retrieved by a single list_objects_v2API call should fit in at most 1MB, which nowadays is a very small memory footprint.

The main reason I decided to include this parameter is for the special case when it is equal to 0, which results in no data being copied. However, as we are discussing in this comment, we might not want to support that. In that case, we could just remove the parameter altogether.

@bentorb bentorb force-pushed the main branch 2 times, most recently from 9ea19ca to b9e4dab Compare December 19, 2025 09:05
- Add S3CopyPrefixOperator to copy all objects under a prefix
- Support both bucket/prefix params and full S3 URLs
- Include pagination, error handling, and OpenLineage integration
- Add comprehensive unit tests with 14 test cases
- Add system test example and update documentation
- Update S3 operator documentation with new operator section
@o-nikolas
Copy link
Contributor

Just checking in on this one, is all the feedback addressed now @bentorb?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants