Skip to content

Commit

Permalink
For resumable_download, fix #28 so that when tmp file is deleted, we …
Browse files Browse the repository at this point in the history
…delete the record; set connection pool size correctly; close connections for APIs which do not read body.
  • Loading branch information
yami committed Mar 29, 2016
1 parent 9eb0440 commit 65ea1f6
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 8 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ OSS SDK for Python 版本记录

Python SDK的版本号遵循 `Semantic Versioning <http://semver.org/>`_ 规则。

Version 2.1.1
-------------

- 修复:issue #28。
- 修复:正确的设置连接池大小。
- 修复:对于不读取 `body` 的API,如 `upload_part_copy` ,主动调用 `close` ,确保连接及时释放。
- 增加:给 `RequestResult` 和 `Response` 增加 `close` 方法。


Version 2.1.0
-------------

Expand Down
2 changes: 1 addition & 1 deletion oss2/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '2.1.0'
__version__ = '2.1.1'

from . import models, exceptions

Expand Down
13 changes: 10 additions & 3 deletions oss2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def progress_callback(bytes_consumed, total_bytes):

import time
import shutil
import contextlib
import oss2.utils


Expand Down Expand Up @@ -488,7 +489,9 @@ def copy_object(self, source_bucket_name, source_key, target_key, headers=None):
headers['x-oss-copy-source'] = '/' + source_bucket_name + '/' + source_key

resp = self.__do_object('PUT', target_key, headers=headers)
return PutObjectResult(resp)

with contextlib.closing(resp):
return PutObjectResult(resp)

def update_object_meta(self, key, headers):
"""更改Object的元数据信息,包括Content-Type这类标准的HTTP头部,以及以x-oss-meta-开头的自定义元数据。
Expand Down Expand Up @@ -607,7 +610,9 @@ def complete_multipart_upload(self, key, upload_id, parts, headers=None):
params={'uploadId': upload_id},
data=data,
headers=headers)
return PutObjectResult(resp)

with contextlib.closing(resp):
return PutObjectResult(resp)

def abort_multipart_upload(self, key, upload_id):
"""取消分片上传。
Expand Down Expand Up @@ -670,7 +675,9 @@ def upload_part_copy(self, source_bucket_name, source_key, byte_range,
params={'uploadId': target_upload_id,
'partNumber': str(target_part_number)},
headers=headers)
return PutObjectResult(resp)

with contextlib.closing(resp):
return PutObjectResult(resp)

def list_parts(self, key, upload_id,
marker='', max_parts=1000):
Expand Down
8 changes: 6 additions & 2 deletions oss2/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ class Session(object):
def __init__(self):
self.session = requests.Session()

self.session.mount('http://', requests.adapters.HTTPAdapter(pool_connections=defaults.connection_pool_size))
self.session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=defaults.connection_pool_size))
psize = defaults.connection_pool_size
self.session.mount('http://', requests.adapters.HTTPAdapter(pool_connections=psize, pool_maxsize=psize))
self.session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=psize, pool_maxsize=psize))

def do_request(self, req, timeout):
try:
Expand Down Expand Up @@ -94,6 +95,9 @@ def read(self, amt=None):
def __iter__(self):
return self.response.iter_content(_CHUNK_SIZE)

def close(self):
self.response.close()


# requests对于具有fileno()方法的file object,会用fileno()的返回值作为Content-Length。
# 这对于已经读取了部分内容,或执行了seek()的file object是不正确的。
Expand Down
3 changes: 3 additions & 0 deletions oss2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ def __init__(self, resp):
#: 请求ID,用于跟踪一个OSS请求。提交工单时,最后能够提供请求ID
self.request_id = resp.headers.get('x-oss-request-id', '')

def close(self):
self.resp.close()


class HeadObjectResult(RequestResult):
def __init__(self, resp):
Expand Down
6 changes: 5 additions & 1 deletion oss2/resumable.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def resumable_download(bucket, key, filename,
实现的方法是:
#. 在本地创建一个临时文件,文件名由原始文件名加上一个随机的后缀组成;
#. 通过指定请求的 `Range` 头按照范围并发读取OSS文件,并写入到临时文件里对应的位置;
#. 全部完成之后,把临时文件重名为目标文件 (即 `filename` )
#. 全部完成之后,把临时文件重命名为目标文件 (即 `filename` )
在上述过程中,断点信息,即已经完成的范围,会保存在磁盘上。因为某种原因下载中断,后续如果下载
同样的文件,也就是源文件和目标文件一样,就会先读取断点信息,然后只下载缺失的部分。
Expand Down Expand Up @@ -296,6 +296,10 @@ def __load_record(self):
self._del_record()
record = None

if record and not os.path.exists(self.filename + record['tmp_suffix']):
self._del_record()
record = None

if record and self.__is_remote_changed(record):
utils.silently_remove(self.filename + record['tmp_suffix'])
self._del_record()
Expand Down
33 changes: 32 additions & 1 deletion tests/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def test_remote_changed_before_start(self):
self.__test_insane_record(400, partial(modify_one, key='size', value=1024), old_tmp_exists=False)
self.__test_insane_record(400, partial(modify_one, key='mtime', value=1024), old_tmp_exists=False)

def test_remote_changed_during_upload(self):
def test_remote_changed_during_download(self):
oss2.defaults.multiget_threshold = 1
oss2.defaults.multiget_part_size = 100
oss2.defaults.multiget_num_threads = 2
Expand Down Expand Up @@ -491,6 +491,37 @@ def mock_rename(src, dst):

oss2.utils.silently_remove(abspath)

def test_tmp_file_removed(self):
oss2.defaults.multiget_threshold = 1
oss2.defaults.multiget_part_size = 100
oss2.defaults.multiget_num_threads = 5

orig_download_part = oss2.resumable._ResumableDownloader._ResumableDownloader__download_part

file_size = 123 * 3 + 1
key, filename, content = self.__prepare(file_size)

context = {}

def mock_download_part(downloader, part, part_number=None):
if part.part_number == part_number:
r = self.__record(key, filename)
context['tmpfile'] = filename + r['tmp_suffix']

raise RuntimeError("Fail download_part for part: {0}".format(part_number))
else:
orig_download_part(downloader, part)

with patch.object(oss2.resumable._ResumableDownloader, '_ResumableDownloader__download_part',
side_effect=partial(mock_download_part, part_number=2),
autospec=True):
self.assertRaises(RuntimeError, oss2.resumable_download, self.bucket, key, filename)

os.remove(context['tmpfile'])

oss2.resumable_download(self.bucket, key, filename)
self.assertFileContent(filename, content)


if __name__ == '__main__':
unittest.main()
12 changes: 12 additions & 0 deletions tests/test_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import requests
import filecmp
import calendar
import contextlib

from oss2.exceptions import (ClientError, RequestError,
NotFound, NoSuchKey, Conflict, PositionNotEqualToLength, ObjectNotAppendable)
Expand Down Expand Up @@ -420,6 +421,17 @@ def test_invalid_object_name(self):

self.assertRaises(oss2.exceptions.InvalidObjectName, self.bucket.put_object, key, content)

def test_close(self):
key = self.random_key()
content = random_bytes(512)

self.bucket.put_object(key, content)

result = self.bucket.get_object(key)
with contextlib.closing(result):
pass

self.assertEqual(len(result.read()), 0)

if __name__ == '__main__':
unittest.main()
3 changes: 3 additions & 0 deletions unittests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,9 @@ def __next__(self):
def next(self):
return self.read(8192)

def close(self):
pass


def _is_xml(content):
try:
Expand Down

0 comments on commit 65ea1f6

Please sign in to comment.