Skip to content

Commit

Permalink
Merge pull request #64 from kouk/master
Browse files Browse the repository at this point in the history
Multipart upload support
  • Loading branch information
spulec committed Nov 23, 2013
2 parents d5b3af2 + 85e3210 commit 4853954
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 1 deletion.
75 changes: 74 additions & 1 deletion moto/s3/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import os
import base64
import datetime
import hashlib

from moto.core import BaseBackend
from moto.core.utils import iso_8601_datetime, rfc_1123_datetime
from .utils import clean_key_name

UPLOAD_ID_BYTES=43
UPLOAD_PART_MIN_SIZE=5242880


class FakeKey(object):
def __init__(self, name, value):
Expand All @@ -23,7 +28,7 @@ def append_to_value(self, value):
@property
def etag(self):
value_md5 = hashlib.md5()
value_md5.update(self.value)
value_md5.update(bytes(self.value))
return '"{0}"'.format(value_md5.hexdigest())

@property
Expand Down Expand Up @@ -52,10 +57,48 @@ def size(self):
return len(self.value)


class FakeMultipart(object):
def __init__(self, key_name):
self.key_name = key_name
self.parts = {}
self.id = base64.b64encode(os.urandom(UPLOAD_ID_BYTES)).replace('=', '').replace('+', '')

def complete(self):
total = bytearray()
last_part_name = len(self.list_parts())

for part in self.list_parts():
if part.name != last_part_name and len(part.value) < UPLOAD_PART_MIN_SIZE:
return
total.extend(part.value)

return total

def set_part(self, part_id, value):
if part_id < 1:
return

key = FakeKey(part_id, value)
self.parts[part_id] = key
return key

def list_parts(self):
parts = []

for part_id, index in enumerate(sorted(self.parts.keys()), start=1):
# Make sure part ids are continuous
if part_id != index:
return
parts.append(self.parts[part_id])

return parts


class FakeBucket(object):
def __init__(self, name):
self.name = name
self.keys = {}
self.multiparts = {}


class S3Backend(BaseBackend):
Expand Down Expand Up @@ -106,6 +149,36 @@ def get_key(self, bucket_name, key_name):
if bucket:
return bucket.keys.get(key_name)

def initiate_multipart(self, bucket_name, key_name):
bucket = self.buckets[bucket_name]
new_multipart = FakeMultipart(key_name)
bucket.multiparts[new_multipart.id] = new_multipart

return new_multipart

def complete_multipart(self, bucket_name, multipart_id):
bucket = self.buckets[bucket_name]
multipart = bucket.multiparts[multipart_id]
value = multipart.complete()
if value is None:
return
del bucket.multiparts[multipart_id]

return self.set_key(bucket_name, multipart.key_name, value)

def cancel_multipart(self, bucket_name, multipart_id):
bucket = self.buckets[bucket_name]
del bucket.multiparts[multipart_id]

def list_multipart(self, bucket_name, multipart_id):
bucket = self.buckets[bucket_name]
return bucket.multiparts[multipart_id].list_parts()

def set_part(self, bucket_name, multipart_id, part_id, value):
bucket = self.buckets[bucket_name]
multipart = bucket.multiparts[multipart_id]
return multipart.set_part(part_id, value)

def prefix_query(self, bucket, prefix, delimiter):
key_results = set()
folder_results = set()
Expand Down
109 changes: 109 additions & 0 deletions moto/s3/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def key_response(self, request, full_url, headers):

def _key_response(self, request, full_url, headers):
parsed_url = urlparse(full_url)
query = parse_qs(parsed_url.query)
method = request.method

key_name = self.parse_key_name(parsed_url.path)
Expand All @@ -130,13 +131,32 @@ def _key_response(self, request, full_url, headers):
body = request.data

if method == 'GET':
if 'uploadId' in query:
upload_id = query['uploadId'][0]
parts = self.backend.list_multipart(bucket_name, upload_id)
template = Template(S3_MULTIPART_LIST_RESPONSE)
return 200, headers, template.render(
bucket_name=bucket_name,
key_name=key_name,
upload_id=upload_id,
count=len(parts),
parts=parts
)
key = self.backend.get_key(bucket_name, key_name)
if key:
headers.update(key.metadata)
return 200, headers, key.value
else:
return 404, headers, ""
if method == 'PUT':
if 'uploadId' in query and 'partNumber' in query and body:
upload_id = query['uploadId'][0]
part_number = int(query['partNumber'][0])
key = self.backend.set_part(bucket_name, upload_id, part_number, body)
template = Template(S3_MULTIPART_UPLOAD_RESPONSE)
headers.update(key.response_dict)
return 200, headers, template.render(part=key)

if 'x-amz-copy-source' in request.headers:
# Copy key
src_bucket, src_key = request.headers.get("x-amz-copy-source").split("/", 1)
Expand Down Expand Up @@ -177,9 +197,39 @@ def _key_response(self, request, full_url, headers):
else:
return 404, headers, ""
elif method == 'DELETE':
if 'uploadId' in query:
upload_id = query['uploadId'][0]
self.backend.cancel_multipart(bucket_name, upload_id)
return 204, headers, ""
removed_key = self.backend.delete_key(bucket_name, key_name)
template = Template(S3_DELETE_OBJECT_SUCCESS)
return 204, headers, template.render(bucket=removed_key)
elif method == 'POST':
if body == '' and parsed_url.query == 'uploads':
multipart = self.backend.initiate_multipart(bucket_name, key_name)
template = Template(S3_MULTIPART_INITIATE_RESPONSE)
response = template.render(
bucket_name=bucket_name,
key_name=key_name,
upload_id=multipart.id,
)
return 200, headers, response

if 'uploadId' in query:
upload_id = query['uploadId'][0]
key = self.backend.complete_multipart(bucket_name, upload_id)

if key is not None:
template = Template(S3_MULTIPART_COMPLETE_RESPONSE)
return template.render(
bucket_name=bucket_name,
key_name=key.name,
etag=key.etag,
)
template = Template(S3_MULTIPART_COMPLETE_TOO_SMALL_ERROR)
return 400, headers, template.render()
else:
raise NotImplementedError("Method POST had only been implemented for multipart uploads so far")
else:
raise NotImplementedError("Method {0} has not been impelemented in the S3 backend yet".format(method))

Expand Down Expand Up @@ -279,3 +329,62 @@ def _key_response(self, request, full_url, headers):
<LastModified>{{ key.last_modified_ISO8601 }}</LastModified>
</CopyObjectResponse>
</CopyObjectResponse>"""

S3_MULTIPART_INITIATE_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Bucket>{{ bucket_name }}</Bucket>
<Key>{{ key_name }}</Key>
<UploadId>{{ upload_id }}</UploadId>
</InitiateMultipartUploadResult>"""

S3_MULTIPART_UPLOAD_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<CopyPartResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<LastModified>{{ part.last_modified_ISO8601 }}</LastModified>
<ETag>{{ part.etag }}</ETag>
</CopyPartResult>"""

S3_MULTIPART_LIST_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<ListPartsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Bucket>{{ bucket_name }}</Bucket>
<Key>{{ key_name }}</Key>
<UploadId>{{ upload_id }}</UploadId>
<StorageClass>STANDARD</StorageClass>
<Initiator>
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
<DisplayName>webfile</DisplayName>
</Initiator>
<Owner>
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
<DisplayName>webfile</DisplayName>
</Owner>
<StorageClass>STANDARD</StorageClass>
<PartNumberMarker>1</PartNumberMarker>
<NextPartNumberMarker>{{ count }} </NextPartNumberMarker>
<MaxParts>{{ count }}</MaxParts>
<IsTruncated>false</IsTruncated>
{% for part in parts %}
<Part>
<PartNumber>{{ part.name }}</PartNumber>
<LastModified>{{ part.last_modified_ISO8601 }}</LastModified>
<ETag>{{ part.etag }}</ETag>
<Size>{{ part.size }}</Size>
</Part>
{% endfor %}
</ListPartsResult>"""

S3_MULTIPART_COMPLETE_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Location>http://{{ bucket_name }}.s3.amazonaws.com/{{ key_name }}</Location>
<Bucket>{{ bucket_name }}</Bucket>
<Key>{{ key_name }}</Key>
<ETag>{{ etag }}</ETag>
</CompleteMultipartUploadResult>
"""

S3_MULTIPART_COMPLETE_TOO_SMALL_ERROR = """<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>EntityTooSmall</Code>
<Message>Your proposed upload is smaller than the minimum allowed object size.</Message>
<RequestId>asdfasdfsdafds</RequestId>
<HostId>sdfgdsfgdsfgdfsdsfgdfs</HostId>
</Error>"""
29 changes: 29 additions & 0 deletions tests/test_s3/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import urllib2
from io import BytesIO

import boto
from boto.exception import S3ResponseError
Expand Down Expand Up @@ -37,6 +38,34 @@ def test_my_model_save():
conn.get_bucket('mybucket').get_key('steve').get_contents_as_string().should.equal('is awesome')


@mock_s3
def test_multipart_upload_too_small():
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket("foobar")

multipart = bucket.initiate_multipart_upload("the-key")
multipart.upload_part_from_file(BytesIO('hello'), 1)
multipart.upload_part_from_file(BytesIO('world'), 2)
# Multipart with total size under 5MB is refused
multipart.complete_upload.should.throw(S3ResponseError)


@mock_s3
def test_multipart_upload():
conn = boto.connect_s3('the_key', 'the_secret')
bucket = conn.create_bucket("foobar")

multipart = bucket.initiate_multipart_upload("the-key")
part1 = '0' * 5242880
multipart.upload_part_from_file(BytesIO(part1), 1)
# last part, can be less than 5 MB
part2 = '1'
multipart.upload_part_from_file(BytesIO(part2), 2)
multipart.complete_upload()
# we should get both parts as the key contents
bucket.get_key("the-key").get_contents_as_string().should.equal(part1 + part2)


@mock_s3
def test_missing_key():
conn = boto.connect_s3('the_key', 'the_secret')
Expand Down

0 comments on commit 4853954

Please sign in to comment.