-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Description
Create operator that lists s3 prefixes in order to return a list of subfolders from an S3 bucket.
Use case / motivation
We would like to use an operator to return subfolders in an s3 bucket. This would help us determine partitioning keys for an ETL. Getting these subfolders would allow us to parse them and then pass them to a LivyOperator or the AWSAthenaOperator to then ETL data into a table partitioned by the returned partition key (based on the subfolders found in said s3 bucket).
Currently, the S3ListOperator uses the S3Hook to list keys in a bucket:
def execute(self, context):
hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
self.log.info(
'Getting the list of files from bucket: %s in prefix: %s (Delimiter {%s)',
self.bucket, self.prefix, self.delimiter
)
return hook.list_keys(
bucket_name=self.bucket,
prefix=self.prefix,
delimiter=self.delimiter)It only returns files and not subfolders because the S3Hook's list_key function parses each result page that contains contents.
for page in response:
if 'Contents' in page:
has_results = True
for k in page['Contents']:
keys.append(k['Key'])If we want to get subfolders in a bucket, we instead want to use the list_prefixes method of the S3Hook because it will look for subfolders found under 'Common Prefixes' for a given page in the response.
Currently, there is not an operator using this method. This issue proposes that an S3ListPrefixesOperator be created that uses the list_prefixes method of the S3Hook.
for page in response:
if 'CommonPrefixes' in page:
has_results = True
for common_prefix in page['CommonPrefixes']:
prefixes.append(common_prefix['Prefix'])
if has_results:
return prefixes
return NoneWe have tested the creation of a new operator like this internally and it has worked in our preliminary trials.