Skip to content

Commit

Permalink
[AIRFLOW-2557] Fix pagination for s3
Browse files Browse the repository at this point in the history
Paged tests for s3 are taking over 120 seconds.
There
is functionality to set the page size. This
reduces
the time spent on tests.

Closes apache#3455 from bolkedebruin/AIRFLOW-2557
  • Loading branch information
bolkedebruin authored and Alice Berard committed Jan 3, 2019
1 parent 3a91429 commit 3492781
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
30 changes: 26 additions & 4 deletions airflow/hooks/S3_hook.py
Expand Up @@ -77,7 +77,8 @@ def check_for_prefix(self, bucket_name, prefix, delimiter):
plist = self.list_prefixes(bucket_name, previous_level, delimiter)
return False if plist is None else prefix in plist

def list_prefixes(self, bucket_name, prefix='', delimiter=''):
def list_prefixes(self, bucket_name, prefix='', delimiter='',
page_size=None, max_items=None):
"""
Lists prefixes in a bucket under prefix
Expand All @@ -87,11 +88,21 @@ def list_prefixes(self, bucket_name, prefix='', delimiter=''):
:type prefix: str
:param delimiter: the delimiter marks key hierarchy.
:type delimiter: str
:param page_size: pagination size
:type page_size: int
:param max_items: maximum items to return
:type max_items: int
"""
config = {
'PageSize': page_size,
'MaxItems': max_items,
}

paginator = self.get_conn().get_paginator('list_objects_v2')
response = paginator.paginate(Bucket=bucket_name,
Prefix=prefix,
Delimiter=delimiter)
Delimiter=delimiter,
PaginationConfig=config)

has_results = False
prefixes = []
Expand All @@ -104,7 +115,8 @@ def list_prefixes(self, bucket_name, prefix='', delimiter=''):
if has_results:
return prefixes

def list_keys(self, bucket_name, prefix='', delimiter=''):
def list_keys(self, bucket_name, prefix='', delimiter='',
page_size=None, max_items=None):
"""
Lists keys in a bucket under prefix and not containing delimiter
Expand All @@ -114,11 +126,21 @@ def list_keys(self, bucket_name, prefix='', delimiter=''):
:type prefix: str
:param delimiter: the delimiter marks key hierarchy.
:type delimiter: str
:param page_size: pagination size
:type page_size: int
:param max_items: maximum items to return
:type max_items: int
"""
config = {
'PageSize': page_size,
'MaxItems': max_items,
}

paginator = self.get_conn().get_paginator('list_objects_v2')
response = paginator.paginate(Bucket=bucket_name,
Prefix=prefix,
Delimiter=delimiter)
Delimiter=delimiter,
PaginationConfig=config)

has_results = False
keys = []
Expand Down
18 changes: 11 additions & 7 deletions tests/hooks/test_s3_hook.py
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -96,13 +96,16 @@ def test_list_prefixes_paged(self):
b = hook.get_bucket('bucket')
b.create()

keys = ["%s/b" % i for i in range(5000)]
dirs = ["%s/" % i for i in range(5000)]
# we dont need to test the paginator
# that's covered by boto tests
keys = ["%s/b" % i for i in range(2)]
dirs = ["%s/" % i for i in range(2)]
for key in keys:
b.put_object(Key=key, Body=b'a')

self.assertListEqual(sorted(dirs),
sorted(hook.list_prefixes('bucket', delimiter='/')))
sorted(hook.list_prefixes('bucket', delimiter='/',
page_size=1)))

@mock_s3
def test_list_keys(self):
Expand All @@ -123,12 +126,13 @@ def test_list_keys_paged(self):
b = hook.get_bucket('bucket')
b.create()

keys = [str(i) for i in range(5000)]
keys = [str(i) for i in range(2)]
for key in keys:
b.put_object(Key=key, Body=b'a')

self.assertListEqual(sorted(keys),
sorted(hook.list_keys('bucket', delimiter='/')))
sorted(hook.list_keys('bucket', delimiter='/',
page_size=1)))

@mock_s3
def test_check_for_key(self):
Expand Down

0 comments on commit 3492781

Please sign in to comment.