Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libcloud/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,8 @@ def _stream_data(self, response, iterator, chunked=False,
if calculate_hash:
data_hash = self._get_hash_function()

generator = libcloud.utils.files.read_in_chunks(iterator, chunk_size)
generator = libcloud.utils.files.read_in_chunks(iterator, chunk_size,
fill_size=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the fill_size change necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refer to my commit message:
"Don't send one byte at a time, read the iterator in chunks and fill the chunks, otherwise it turns a 1.6MB file into more than 400MB (when I gave up and killed the request)."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more explicit, I tested this by watching the python3.4 process in the OSX Activity Monitor > Network > Sent Bytes. Without this fix I saw over 400MB uploaded (when I gave up, debugging later revealed the process sending the bytes one at a time, so probably one per packet with all the associated overheads).

With this fix, the process only uploaded a little over the expected 1.6MB for my test file, which makes sense with typical network overheads (wrapping in packets, <1500 Bytes MTU meaning some fragmentation, etc).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked at it again in a debugger and it appears this only affects storage drivers with supports_chunked_encoding = True, i.e. the Atmos and Rackspace drivers at this time.
https://github.com/apache/libcloud/blob/ea16988/libcloud/storage/base.py#L634-636

Otherwise it uses the else branch directly below and consumes the entire iterator as already documented:
https://github.com/apache/libcloud/blob/ea16988/libcloud/storage/base.py#L400-408

EDIT: Changed links to include a gitref so they should be stable over time.


bytes_transferred = 0
try:
Expand Down
69 changes: 64 additions & 5 deletions libcloud/test/storage/test_cloudfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from libcloud.utils.py3 import urlquote

from libcloud.common.types import LibcloudError, MalformedResponseError
from libcloud.storage.base import Container, Object
from libcloud.storage.base import CHUNK_SIZE, Container, Object
from libcloud.storage.types import ContainerAlreadyExistsError
from libcloud.storage.types import ContainerDoesNotExistError
from libcloud.storage.types import ContainerIsNotEmptyError
Expand Down Expand Up @@ -665,10 +665,6 @@ def intercept_request(request_path,

try:
self.driver.upload_object_via_stream(
# We never reach the Python 3 only bytes vs int error
# currently at libcloud/utils/py3.py:89
# raise TypeError("Invalid argument %r for b()" % (s,))
# because I raise a NotImplementedError.
iterator=iter(b'blob data like an image or video'),
container=container,
object_name="test_object",
Expand All @@ -682,6 +678,64 @@ def intercept_request(request_path,
self.fail('Expected NotImplementedError to be thrown to '
'verify we actually checked the expected headers')

def test_upload_object_via_stream_python3_bytes_error(self):
container = Container(name='py3', extra={}, driver=self.driver)
bytes_blob = b'blob data like an image or video'

# This is mostly to check we didn't discover other errors along the way
mocked_response = container.upload_object_via_stream(
iterator=iter(bytes_blob),
object_name="img_or_vid",
)
self.assertEqual(len(bytes_blob), mocked_response.size)

def test_upload_object_via_stream_chunked_encoding(self):

# Create enough bytes it should get split into two chunks
bytes_blob = ''.join(['\0' for _ in range(CHUNK_SIZE + 1)])
hex_chunk_size = ('%X' % CHUNK_SIZE).encode('utf8')
expected = [
# Chunk 1
hex_chunk_size + b'\r\n',
bytes(bytes_blob[:CHUNK_SIZE].encode('utf8')),
b'\r\n',

# Chunk 2
b'1\r\n',
bytes(bytes_blob[CHUNK_SIZE:].encode('utf8')),
b'\r\n',

# If chunked, also send a final message
b'0\r\n\r\n',
]
logged_data = []

class InterceptResponse(CloudFilesMockRawResponse):
def __init__(self, connection):
super(InterceptResponse, self).__init__(connection=connection)
old_send = self.connection.connection.send

def intercept_send(data):
old_send(data)
logged_data.append(data)
self.connection.connection.send = intercept_send

def _v1_MossoCloudFS_py3_img_or_vid2(self,
method, url, body, headers):
headers = {'etag': 'd79fb00c27b50494a463e680d459c90c'}
headers.update(self.base_headers)
_201 = httplib.CREATED
return _201, '', headers, httplib.responses[_201]

self.driver_klass.connectionCls.rawResponseCls = InterceptResponse

container = Container(name='py3', extra={}, driver=self.driver)
container.upload_object_via_stream(
iterator=iter(bytes_blob),
object_name="img_or_vid2",
)
self.assertListEqual(expected, logged_data)

def test__upload_object_manifest(self):
hash_function = self.driver._get_hash_function()
hash_function.update(b(''))
Expand Down Expand Up @@ -1087,6 +1141,11 @@ class CloudFilesMockRawResponse(MockRawResponse):
fixtures = StorageFileFixtures('cloudfiles')
base_headers = {'content-type': 'application/json; charset=UTF-8'}

def _v1_MossoCloudFS_py3_img_or_vid(self, method, url, body, headers):
headers = {'etag': 'e2378cace8712661ce7beec3d9362ef6'}
headers.update(self.base_headers)
return httplib.CREATED, '', headers, httplib.responses[httplib.CREATED]

def _v1_MossoCloudFS_foo_bar_container_foo_test_upload(
self, method, url, body, headers):
# test_object_upload_success
Expand Down
2 changes: 1 addition & 1 deletion libcloud/utils/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def read_in_chunks(iterator, chunk_size=None, fill_size=False,
"""
Return a generator which yields data in chunks.

:param terator: An object which implements an iterator interface
:param iterator: An object which implements an iterator interface
or a File like object with read method.
:type iterator: :class:`object` which implements iterator interface.

Expand Down
2 changes: 2 additions & 0 deletions libcloud/utils/py3.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def b(s):
return s.encode('utf-8')
elif isinstance(s, bytes):
return s
elif isinstance(s, int):
return bytes([s])
else:
raise TypeError("Invalid argument %r for b()" % (s,))

Expand Down