From 65ea1f61c2382d183f3e9f84af15d1150ec09a9e Mon Sep 17 00:00:00 2001 From: yami Date: Tue, 29 Mar 2016 18:04:26 +0800 Subject: [PATCH 1/5] For resumable_download, fix #28 so that when tmp file is deleted, we delete the record; set connection pool size correctly; close connections for APIs which do not read body. --- CHANGELOG.rst | 9 +++++++++ oss2/__init__.py | 2 +- oss2/api.py | 13 ++++++++++--- oss2/http.py | 8 ++++++-- oss2/models.py | 3 +++ oss2/resumable.py | 6 +++++- tests/test_download.py | 33 ++++++++++++++++++++++++++++++++- tests/test_object.py | 12 ++++++++++++ unittests/common.py | 3 +++ 9 files changed, 81 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 09984a77..064a645f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,15 @@ OSS SDK for Python 版本记录 Python SDK的版本号遵循 `Semantic Versioning `_ 规则。 +Version 2.1.1 +------------- + +- 修复:issue #28。 +- 修复:正确的设置连接池大小。 +- 修复:对于不读取 `body` 的API,如 `upload_part_copy` ,主动调用 `close` ,确保连接及时释放。 +- 增加:给 `RequestResult` 和 `Response` 增加 `close` 方法。 + + Version 2.1.0 ------------- diff --git a/oss2/__init__.py b/oss2/__init__.py index 45b7e35e..4a2165f1 100644 --- a/oss2/__init__.py +++ b/oss2/__init__.py @@ -1,4 +1,4 @@ -__version__ = '2.1.0' +__version__ = '2.1.1' from . import models, exceptions diff --git a/oss2/api.py b/oss2/api.py index 30530c3a..c697903c 100644 --- a/oss2/api.py +++ b/oss2/api.py @@ -122,6 +122,7 @@ def progress_callback(bytes_consumed, total_bytes): import time import shutil +import contextlib import oss2.utils @@ -488,7 +489,9 @@ def copy_object(self, source_bucket_name, source_key, target_key, headers=None): headers['x-oss-copy-source'] = '/' + source_bucket_name + '/' + source_key resp = self.__do_object('PUT', target_key, headers=headers) - return PutObjectResult(resp) + + with contextlib.closing(resp): + return PutObjectResult(resp) def update_object_meta(self, key, headers): """更改Object的元数据信息,包括Content-Type这类标准的HTTP头部,以及以x-oss-meta-开头的自定义元数据。 @@ -607,7 +610,9 @@ def complete_multipart_upload(self, key, upload_id, parts, headers=None): params={'uploadId': upload_id}, data=data, headers=headers) - return PutObjectResult(resp) + + with contextlib.closing(resp): + return PutObjectResult(resp) def abort_multipart_upload(self, key, upload_id): """取消分片上传。 @@ -670,7 +675,9 @@ def upload_part_copy(self, source_bucket_name, source_key, byte_range, params={'uploadId': target_upload_id, 'partNumber': str(target_part_number)}, headers=headers) - return PutObjectResult(resp) + + with contextlib.closing(resp): + return PutObjectResult(resp) def list_parts(self, key, upload_id, marker='', max_parts=1000): diff --git a/oss2/http.py b/oss2/http.py index 417fefde..163bd73d 100644 --- a/oss2/http.py +++ b/oss2/http.py @@ -28,8 +28,9 @@ class Session(object): def __init__(self): self.session = requests.Session() - self.session.mount('http://', requests.adapters.HTTPAdapter(pool_connections=defaults.connection_pool_size)) - self.session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=defaults.connection_pool_size)) + psize = defaults.connection_pool_size + self.session.mount('http://', requests.adapters.HTTPAdapter(pool_connections=psize, pool_maxsize=psize)) + self.session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=psize, pool_maxsize=psize)) def do_request(self, req, timeout): try: @@ -94,6 +95,9 @@ def read(self, amt=None): def __iter__(self): return self.response.iter_content(_CHUNK_SIZE) + def close(self): + self.response.close() + # requests对于具有fileno()方法的file object,会用fileno()的返回值作为Content-Length。 # 这对于已经读取了部分内容,或执行了seek()的file object是不正确的。 diff --git a/oss2/models.py b/oss2/models.py index 9176a5fe..00014294 100644 --- a/oss2/models.py +++ b/oss2/models.py @@ -54,6 +54,9 @@ def __init__(self, resp): #: 请求ID,用于跟踪一个OSS请求。提交工单时,最后能够提供请求ID self.request_id = resp.headers.get('x-oss-request-id', '') + def close(self): + self.resp.close() + class HeadObjectResult(RequestResult): def __init__(self, resp): diff --git a/oss2/resumable.py b/oss2/resumable.py index adc934d6..3181b0b6 100644 --- a/oss2/resumable.py +++ b/oss2/resumable.py @@ -85,7 +85,7 @@ def resumable_download(bucket, key, filename, 实现的方法是: #. 在本地创建一个临时文件,文件名由原始文件名加上一个随机的后缀组成; #. 通过指定请求的 `Range` 头按照范围并发读取OSS文件,并写入到临时文件里对应的位置; - #. 全部完成之后,把临时文件重名为目标文件 (即 `filename` ) + #. 全部完成之后,把临时文件重命名为目标文件 (即 `filename` ) 在上述过程中,断点信息,即已经完成的范围,会保存在磁盘上。因为某种原因下载中断,后续如果下载 同样的文件,也就是源文件和目标文件一样,就会先读取断点信息,然后只下载缺失的部分。 @@ -296,6 +296,10 @@ def __load_record(self): self._del_record() record = None + if record and not os.path.exists(self.filename + record['tmp_suffix']): + self._del_record() + record = None + if record and self.__is_remote_changed(record): utils.silently_remove(self.filename + record['tmp_suffix']) self._del_record() diff --git a/tests/test_download.py b/tests/test_download.py index 0cdeddaf..c9ae2cc1 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -281,7 +281,7 @@ def test_remote_changed_before_start(self): self.__test_insane_record(400, partial(modify_one, key='size', value=1024), old_tmp_exists=False) self.__test_insane_record(400, partial(modify_one, key='mtime', value=1024), old_tmp_exists=False) - def test_remote_changed_during_upload(self): + def test_remote_changed_during_download(self): oss2.defaults.multiget_threshold = 1 oss2.defaults.multiget_part_size = 100 oss2.defaults.multiget_num_threads = 2 @@ -491,6 +491,37 @@ def mock_rename(src, dst): oss2.utils.silently_remove(abspath) + def test_tmp_file_removed(self): + oss2.defaults.multiget_threshold = 1 + oss2.defaults.multiget_part_size = 100 + oss2.defaults.multiget_num_threads = 5 + + orig_download_part = oss2.resumable._ResumableDownloader._ResumableDownloader__download_part + + file_size = 123 * 3 + 1 + key, filename, content = self.__prepare(file_size) + + context = {} + + def mock_download_part(downloader, part, part_number=None): + if part.part_number == part_number: + r = self.__record(key, filename) + context['tmpfile'] = filename + r['tmp_suffix'] + + raise RuntimeError("Fail download_part for part: {0}".format(part_number)) + else: + orig_download_part(downloader, part) + + with patch.object(oss2.resumable._ResumableDownloader, '_ResumableDownloader__download_part', + side_effect=partial(mock_download_part, part_number=2), + autospec=True): + self.assertRaises(RuntimeError, oss2.resumable_download, self.bucket, key, filename) + + os.remove(context['tmpfile']) + + oss2.resumable_download(self.bucket, key, filename) + self.assertFileContent(filename, content) + if __name__ == '__main__': unittest.main() \ No newline at end of file diff --git a/tests/test_object.py b/tests/test_object.py index e9ebb465..fcbea592 100644 --- a/tests/test_object.py +++ b/tests/test_object.py @@ -3,6 +3,7 @@ import requests import filecmp import calendar +import contextlib from oss2.exceptions import (ClientError, RequestError, NotFound, NoSuchKey, Conflict, PositionNotEqualToLength, ObjectNotAppendable) @@ -420,6 +421,17 @@ def test_invalid_object_name(self): self.assertRaises(oss2.exceptions.InvalidObjectName, self.bucket.put_object, key, content) + def test_close(self): + key = self.random_key() + content = random_bytes(512) + + self.bucket.put_object(key, content) + + result = self.bucket.get_object(key) + with contextlib.closing(result): + pass + + self.assertEqual(len(result.read()), 0) if __name__ == '__main__': unittest.main() diff --git a/unittests/common.py b/unittests/common.py index f2032b99..93ca8175 100644 --- a/unittests/common.py +++ b/unittests/common.py @@ -396,6 +396,9 @@ def __next__(self): def next(self): return self.read(8192) + def close(self): + pass + def _is_xml(content): try: From 6e0938d110d4cb9fb2b6673a5d729300f7e9d41f Mon Sep 17 00:00:00 2001 From: yami Date: Tue, 29 Mar 2016 20:46:52 +0800 Subject: [PATCH 2/5] revert code to close request results. --- oss2/api.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/oss2/api.py b/oss2/api.py index c697903c..85b5a782 100644 --- a/oss2/api.py +++ b/oss2/api.py @@ -122,7 +122,6 @@ def progress_callback(bytes_consumed, total_bytes): import time import shutil -import contextlib import oss2.utils @@ -490,8 +489,7 @@ def copy_object(self, source_bucket_name, source_key, target_key, headers=None): resp = self.__do_object('PUT', target_key, headers=headers) - with contextlib.closing(resp): - return PutObjectResult(resp) + return PutObjectResult(resp) def update_object_meta(self, key, headers): """更改Object的元数据信息,包括Content-Type这类标准的HTTP头部,以及以x-oss-meta-开头的自定义元数据。 @@ -611,8 +609,7 @@ def complete_multipart_upload(self, key, upload_id, parts, headers=None): data=data, headers=headers) - with contextlib.closing(resp): - return PutObjectResult(resp) + return PutObjectResult(resp) def abort_multipart_upload(self, key, upload_id): """取消分片上传。 @@ -676,8 +673,7 @@ def upload_part_copy(self, source_bucket_name, source_key, byte_range, 'partNumber': str(target_part_number)}, headers=headers) - with contextlib.closing(resp): - return PutObjectResult(resp) + return PutObjectResult(resp) def list_parts(self, key, upload_id, marker='', max_parts=1000): From 012a88fe92b547cb713ed376e3ea0ee8e7a57f8a Mon Sep 17 00:00:00 2001 From: yami Date: Wed, 30 Mar 2016 09:08:31 +0800 Subject: [PATCH 3/5] revert close method for RequestResult and Response. --- oss2/http.py | 3 --- oss2/models.py | 3 --- tests/test_object.py | 11 ----------- 3 files changed, 17 deletions(-) diff --git a/oss2/http.py b/oss2/http.py index 163bd73d..5ab44e9d 100644 --- a/oss2/http.py +++ b/oss2/http.py @@ -95,9 +95,6 @@ def read(self, amt=None): def __iter__(self): return self.response.iter_content(_CHUNK_SIZE) - def close(self): - self.response.close() - # requests对于具有fileno()方法的file object,会用fileno()的返回值作为Content-Length。 # 这对于已经读取了部分内容,或执行了seek()的file object是不正确的。 diff --git a/oss2/models.py b/oss2/models.py index 00014294..9176a5fe 100644 --- a/oss2/models.py +++ b/oss2/models.py @@ -54,9 +54,6 @@ def __init__(self, resp): #: 请求ID,用于跟踪一个OSS请求。提交工单时,最后能够提供请求ID self.request_id = resp.headers.get('x-oss-request-id', '') - def close(self): - self.resp.close() - class HeadObjectResult(RequestResult): def __init__(self, resp): diff --git a/tests/test_object.py b/tests/test_object.py index fcbea592..78d1aa08 100644 --- a/tests/test_object.py +++ b/tests/test_object.py @@ -421,17 +421,6 @@ def test_invalid_object_name(self): self.assertRaises(oss2.exceptions.InvalidObjectName, self.bucket.put_object, key, content) - def test_close(self): - key = self.random_key() - content = random_bytes(512) - - self.bucket.put_object(key, content) - - result = self.bucket.get_object(key) - with contextlib.closing(result): - pass - - self.assertEqual(len(result.read()), 0) if __name__ == '__main__': unittest.main() From 28874ae6a60b8c07f74fe118cf27982ce9e39122 Mon Sep 17 00:00:00 2001 From: yami Date: Wed, 30 Mar 2016 09:08:31 +0800 Subject: [PATCH 4/5] revert close method for RequestResult and Response. --- CHANGELOG.rst | 2 -- 1 file changed, 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 064a645f..8f20d74d 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -8,8 +8,6 @@ Version 2.1.1 - 修复:issue #28。 - 修复:正确的设置连接池大小。 -- 修复:对于不读取 `body` 的API,如 `upload_part_copy` ,主动调用 `close` ,确保连接及时释放。 -- 增加:给 `RequestResult` 和 `Response` 增加 `close` 方法。 Version 2.1.0 From 2cc9804ca67d346ffe2741119c3321999e80f160 Mon Sep 17 00:00:00 2001 From: yami Date: Wed, 30 Mar 2016 09:08:31 +0800 Subject: [PATCH 5/5] revert close method for RequestResult and Response. --- unittests/common.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/unittests/common.py b/unittests/common.py index 93ca8175..f2032b99 100644 --- a/unittests/common.py +++ b/unittests/common.py @@ -396,9 +396,6 @@ def __next__(self): def next(self): return self.read(8192) - def close(self): - pass - def _is_xml(content): try: