Skip to content

Commit

Permalink
Merge pull request #29 from aliyun/con-multipart
Browse files Browse the repository at this point in the history
For resumable_download, fix #28 so that when tmp file is deleted, we …
  • Loading branch information
yami committed Mar 30, 2016
2 parents 4b1456e + 2cc9804 commit f72cd78
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 5 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -3,6 +3,13 @@ OSS SDK for Python 版本记录

Python SDK的版本号遵循 `Semantic Versioning <http://semver.org/>`_ 规则。

Version 2.1.1
-------------

- 修复:issue #28。
- 修复:正确的设置连接池大小。


Version 2.1.0
-------------

Expand Down
2 changes: 1 addition & 1 deletion oss2/__init__.py
@@ -1,4 +1,4 @@
__version__ = '2.1.0'
__version__ = '2.1.1'

from . import models, exceptions

Expand Down
3 changes: 3 additions & 0 deletions oss2/api.py
Expand Up @@ -488,6 +488,7 @@ def copy_object(self, source_bucket_name, source_key, target_key, headers=None):
headers['x-oss-copy-source'] = '/' + source_bucket_name + '/' + source_key

resp = self.__do_object('PUT', target_key, headers=headers)

return PutObjectResult(resp)

def update_object_meta(self, key, headers):
Expand Down Expand Up @@ -607,6 +608,7 @@ def complete_multipart_upload(self, key, upload_id, parts, headers=None):
params={'uploadId': upload_id},
data=data,
headers=headers)

return PutObjectResult(resp)

def abort_multipart_upload(self, key, upload_id):
Expand Down Expand Up @@ -670,6 +672,7 @@ def upload_part_copy(self, source_bucket_name, source_key, byte_range,
params={'uploadId': target_upload_id,
'partNumber': str(target_part_number)},
headers=headers)

return PutObjectResult(resp)

def list_parts(self, key, upload_id,
Expand Down
5 changes: 3 additions & 2 deletions oss2/http.py
Expand Up @@ -28,8 +28,9 @@ class Session(object):
def __init__(self):
self.session = requests.Session()

self.session.mount('http://', requests.adapters.HTTPAdapter(pool_connections=defaults.connection_pool_size))
self.session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=defaults.connection_pool_size))
psize = defaults.connection_pool_size
self.session.mount('http://', requests.adapters.HTTPAdapter(pool_connections=psize, pool_maxsize=psize))
self.session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=psize, pool_maxsize=psize))

def do_request(self, req, timeout):
try:
Expand Down
6 changes: 5 additions & 1 deletion oss2/resumable.py
Expand Up @@ -85,7 +85,7 @@ def resumable_download(bucket, key, filename,
实现的方法是:
#. 在本地创建一个临时文件,文件名由原始文件名加上一个随机的后缀组成;
#. 通过指定请求的 `Range` 头按照范围并发读取OSS文件,并写入到临时文件里对应的位置;
#. 全部完成之后,把临时文件重名为目标文件 (即 `filename` )
#. 全部完成之后,把临时文件重命名为目标文件 (即 `filename` )
在上述过程中,断点信息,即已经完成的范围,会保存在磁盘上。因为某种原因下载中断,后续如果下载
同样的文件,也就是源文件和目标文件一样,就会先读取断点信息,然后只下载缺失的部分。
Expand Down Expand Up @@ -296,6 +296,10 @@ def __load_record(self):
self._del_record()
record = None

if record and not os.path.exists(self.filename + record['tmp_suffix']):
self._del_record()
record = None

if record and self.__is_remote_changed(record):
utils.silently_remove(self.filename + record['tmp_suffix'])
self._del_record()
Expand Down
33 changes: 32 additions & 1 deletion tests/test_download.py
Expand Up @@ -281,7 +281,7 @@ def test_remote_changed_before_start(self):
self.__test_insane_record(400, partial(modify_one, key='size', value=1024), old_tmp_exists=False)
self.__test_insane_record(400, partial(modify_one, key='mtime', value=1024), old_tmp_exists=False)

def test_remote_changed_during_upload(self):
def test_remote_changed_during_download(self):
oss2.defaults.multiget_threshold = 1
oss2.defaults.multiget_part_size = 100
oss2.defaults.multiget_num_threads = 2
Expand Down Expand Up @@ -491,6 +491,37 @@ def mock_rename(src, dst):

oss2.utils.silently_remove(abspath)

def test_tmp_file_removed(self):
oss2.defaults.multiget_threshold = 1
oss2.defaults.multiget_part_size = 100
oss2.defaults.multiget_num_threads = 5

orig_download_part = oss2.resumable._ResumableDownloader._ResumableDownloader__download_part

file_size = 123 * 3 + 1
key, filename, content = self.__prepare(file_size)

context = {}

def mock_download_part(downloader, part, part_number=None):
if part.part_number == part_number:
r = self.__record(key, filename)
context['tmpfile'] = filename + r['tmp_suffix']

raise RuntimeError("Fail download_part for part: {0}".format(part_number))
else:
orig_download_part(downloader, part)

with patch.object(oss2.resumable._ResumableDownloader, '_ResumableDownloader__download_part',
side_effect=partial(mock_download_part, part_number=2),
autospec=True):
self.assertRaises(RuntimeError, oss2.resumable_download, self.bucket, key, filename)

os.remove(context['tmpfile'])

oss2.resumable_download(self.bucket, key, filename)
self.assertFileContent(filename, content)


if __name__ == '__main__':
unittest.main()
1 change: 1 addition & 0 deletions tests/test_object.py
Expand Up @@ -3,6 +3,7 @@
import requests
import filecmp
import calendar
import contextlib

from oss2.exceptions import (ClientError, RequestError,
NotFound, NoSuchKey, Conflict, PositionNotEqualToLength, ObjectNotAppendable)
Expand Down

0 comments on commit f72cd78

Please sign in to comment.