Skip to content

Commit

Permalink
Add kwargs for boto s3 writing
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason Piper committed Nov 16, 2015
1 parent 13714eb commit e384d4e
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions luigi/s3.py
Expand Up @@ -165,9 +165,11 @@ def get_key(self, path):

return s3_bucket.get_key(key)

def put(self, local_path, destination_s3_path):
def put(self, local_path, destination_s3_path, **kwargs):
"""
Put an object stored locally to an S3 path.
:param kwargs: Keyword arguments are passed to the boto function `set_contents_from_filename`
"""
(bucket, key) = self._path_to_bucket_and_key(destination_s3_path)

Expand All @@ -177,11 +179,13 @@ def put(self, local_path, destination_s3_path):
# put the file
s3_key = Key(s3_bucket)
s3_key.key = key
s3_key.set_contents_from_filename(local_path)
s3_key.set_contents_from_filename(local_path, **kwargs)

def put_string(self, content, destination_s3_path):
def put_string(self, content, destination_s3_path, **kwargs):
"""
Put a string to an S3 path.
:param kwargs: Keyword arguments are passed to the boto function `set_contents_from_string`
"""
(bucket, key) = self._path_to_bucket_and_key(destination_s3_path)
# grab and validate the bucket
Expand All @@ -190,16 +194,17 @@ def put_string(self, content, destination_s3_path):
# put the content
s3_key = Key(s3_bucket)
s3_key.key = key
s3_key.set_contents_from_string(content)
s3_key.set_contents_from_string(content, **kwargs)

def put_multipart(self, local_path, destination_s3_path, part_size=67108864):
def put_multipart(self, local_path, destination_s3_path, part_size=67108864, **kwargs):
"""
Put an object stored locally to an S3 path
using S3 multi-part upload (for files > 5GB).
:param local_path: Path to source local file
:param destination_s3_path: URL for target S3 location
:param part_size: Part size in bytes. Default: 67108864 (64MB), must be >= 5MB and <= 5 GB.
:param kwargs: Keyword arguments are passed to the boto function `initiate_multipart_upload`
"""
# calculate number of parts to upload
# based on the size of the file
Expand All @@ -224,7 +229,7 @@ def put_multipart(self, local_path, destination_s3_path, part_size=67108864):

mp = None
try:
mp = s3_bucket.initiate_multipart_upload(key)
mp = s3_bucket.initiate_multipart_upload(key, **kwargs)

for i in range(num_parts):
# upload a part at a time to S3
Expand Down Expand Up @@ -276,9 +281,11 @@ def get_as_string(self, s3_path):

return contents

def copy(self, source_path, destination_path):
def copy(self, source_path, destination_path, **kwargs):
"""
Copy an object from one S3 location to another.
:param kwargs: Keyword arguments are passed to the boto function `copy_key`
"""
(src_bucket, src_key) = self._path_to_bucket_and_key(source_path)
(dst_bucket, dst_key) = self._path_to_bucket_and_key(destination_path)
Expand All @@ -291,13 +298,15 @@ def copy(self, source_path, destination_path):
for key in self.list(source_path):
s3_bucket.copy_key(dst_prefix + key,
src_bucket,
src_prefix + key)
src_prefix + key, **kwargs)
else:
s3_bucket.copy_key(dst_key, src_bucket, src_key)
s3_bucket.copy_key(dst_key, src_bucket, src_key, **kwargs)

def rename(self, source_path, destination_path):
def rename(self, source_path, destination_path, **kwargs):
"""
Rename/move an object from one S3 location to another.
:param kwargs: Keyword arguments are passed to the boto function `copy_key`
"""
self.copy(source_path, destination_path)
self.remove(source_path)
Expand Down Expand Up @@ -398,14 +407,17 @@ def _add_path_delimiter(self, key):
class AtomicS3File(AtomicLocalFile):
"""
An S3 file that writes to a temp file and put to S3 on close.
:param kwargs: Keyword arguments are passed to the boto function `initiate_multipart_upload`
"""

def __init__(self, path, s3_client):
def __init__(self, path, s3_client, **kwargs):
self.s3_client = s3_client
super(AtomicS3File, self).__init__(path)
self.s3_options = kwargs

def move_to_final_destination(self):
self.s3_client.put_multipart(self.tmp_path, self.path)
self.s3_client.put_multipart(self.tmp_path, self.path, **self.s3_options)


class ReadableS3File(object):
Expand Down Expand Up @@ -492,18 +504,21 @@ def __iter__(self):

class S3Target(FileSystemTarget):
"""
:param kwargs: Keyword arguments are passed to the boto function `initiate_multipart_upload`
"""

fs = None

def __init__(self, path, format=None, client=None):
def __init__(self, path, format=None, client=None, **kwargs):
super(S3Target, self).__init__(path)
if format is None:
format = get_default_format()

self.path = path
self.format = format
self.fs = client or S3Client()
self.s3_options = kwargs

def open(self, mode='r'):
"""
Expand All @@ -519,7 +534,7 @@ def open(self, mode='r'):
fileobj = ReadableS3File(s3_key)
return self.format.pipe_reader(fileobj)
else:
return self.format.pipe_writer(AtomicS3File(self.path, self.fs))
return self.format.pipe_writer(AtomicS3File(self.path, self.fs, **self.s3_options))


class S3FlagTarget(S3Target):
Expand Down

0 comments on commit e384d4e

Please sign in to comment.