From c5130fe79e25255c23e08eb04a0129a80c8b3ebb Mon Sep 17 00:00:00 2001 From: liyanzhang505 Date: Wed, 27 Nov 2019 15:51:29 +0800 Subject: [PATCH 1/5] Support async fetch task. --- examples/async_fetch_task.py | 58 ++++++++++++ oss2/api.py | 62 ++++++++++--- oss2/auth.py | 2 +- oss2/exceptions.py | 3 + oss2/headers.py | 2 + oss2/models.py | 78 ++++++++++++++++ oss2/xml_utils.py | 48 +++++++++- tests/test_async_fetch_task.py | 159 +++++++++++++++++++++++++++++++++ 8 files changed, 397 insertions(+), 15 deletions(-) create mode 100644 examples/async_fetch_task.py create mode 100644 tests/test_async_fetch_task.py diff --git a/examples/async_fetch_task.py b/examples/async_fetch_task.py new file mode 100644 index 00000000..6fc7d6cb --- /dev/null +++ b/examples/async_fetch_task.py @@ -0,0 +1,58 @@ +import os +import oss2 +import base64 +import time +from oss2.compat import to_bytes +from oss2.models import AsyncFetchTaskConfiguration + +# 以下代码展示了创建异步获取文件到bucket任务到API的用法 + +# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。 +# 通过环境变量获取,或者把诸如“<你的AccessKeyId>”替换成真实的AccessKeyId等。 +# +# 以杭州区域为例,Endpoint可以是: +# http://oss-cn-hangzhou.aliyuncs.com +# https://oss-cn-hangzhou.aliyuncs.com +access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>') +access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>') +bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>') +endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>') + +# 确认上面的参数都填写正确了 +for param in (access_key_id, access_key_secret, bucket_name, endpoint): + assert '<' not in param, '请设置参数:' + param + +# 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行 +bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name) + +object_name = "test-async-object" +url = "" +callback = '{"callbackUrl":"www.abc.com/callback","callbackBody":"${etag}"}' +base64_callback = oss2.utils.b64encode_as_string(to_bytes(callback)) + +# 可以选填host, callback, content_md5, ignore_same_key等参数 +task_config = AsyncFetchTaskConfiguration(url, object_name, callback=base64_callback, ignore_same_key=False) + +# 创建异步获取文件到bucket的任务 +result = bucket.put_async_fetch_task(task_config) +task_id = result.task_id +print('task_id:', result.task_id) + +time.sleep(5) + +# 获取指定的异步任务信息 +result = bucket.get_async_fetch_task(task_id) + +# 打印获取到的异步任务信息 +print('=====get result======') +print('task_id:', result.task_id) +print('state:', result.task_state) +print('error_msg:', result.error_msg) +task_config = result.task_config +print('task info:') +print('url:', task_config.url) +print('object_name:', task_config.object_name) +print('host:', task_config.host) +print('content_md5:', task_config.content_md5) +print('callback:', task_config.callback) +print('ignoreSameKey:', task_config.ignore_same_key) \ No newline at end of file diff --git a/oss2/api.py b/oss2/api.py index 4f28f51b..394b6696 100644 --- a/oss2/api.py +++ b/oss2/api.py @@ -385,6 +385,7 @@ class Bucket(_Base): REQUESTPAYMENT = 'requestPayment' QOS_INFO = 'qosInfo' USER_QOS = 'qos' + ASYNC_FETCH = 'asyncFetch' def __init__(self, auth, endpoint, bucket_name, is_cname=False, @@ -2063,14 +2064,14 @@ def put_bucket_versioning(self, config, headers=None): :return: :class:`RequestResult ` """ - logger.debug("Start to put object versioning, bucket: {0}".format( - self.bucket_name)) + logger.debug("Start to put object versioning, bucket: {0}".format(self.bucket_name)) data = self.__convert_data(BucketVersioningConfig, xml_utils.to_put_bucket_versioning, config) headers = http.CaseInsensitiveDict(headers) headers['Content-MD5'] = utils.content_md5(data) resp = self.__do_bucket('PUT', data=data, params={Bucket.VERSIONING: ''}, headers=headers) + logger.debug("Put bucket versiong done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status)) return RequestResult(resp) @@ -2078,27 +2079,26 @@ def get_bucket_versioning(self): """ :return: :class:`GetBucketVersioningResult` """ - logger.debug("Start to get bucket versioning, bucket: {0}".format( - self.bucket_name)) - + logger.debug("Start to get bucket versioning, bucket: {0}".format(self.bucket_name)) resp = self.__do_bucket('GET', params={Bucket.VERSIONING: ''}) + logger.debug("Get bucket versiong done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status)) return self._parse_result(resp, xml_utils.parse_get_bucket_versioning, GetBucketVersioningResult) def put_bucket_policy(self, policy): - """设置bucket policy, 具体policy书写规则请参考官方文档 - :param str policy: - """ - - logger.debug("Start to put bucket policy, bucket: {0}, policy: {1}".format( - self.bucket_name, policy)) + """设置bucket授权策略, 具体policy书写规则请参考官方文档 + :param str policy: 授权策略 + """ + logger.debug("Start to put bucket policy, bucket: {0}, policy: {1}".format(self.bucket_name, policy)) resp = self.__do_bucket('PUT', data=policy, params={Bucket.POLICY: ''}, headers={'Content-MD5': utils.content_md5(policy)}) logger.debug("Put bucket policy done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status)) + return RequestResult(resp) def get_bucket_policy(self): - """ + """获取bucket授权策略 + :return: :class:`GetBucketPolicyResult ` """ @@ -2108,7 +2108,7 @@ def get_bucket_policy(self): return GetBucketPolicyResult(resp) def delete_bucket_policy(self): - """ + """删除bucket授权策略 :return: :class:`RequestResult ` """ logger.debug("Start to delete bucket policy, bucket: {0}".format(self.bucket_name)) @@ -2151,6 +2151,7 @@ def put_bucket_qos_info(self, bucket_qos_info): headers = http.CaseInsensitiveDict() headers['Content-MD5'] = utils.content_md5(data) resp = self.__do_bucket('PUT', data=data, params={Bucket.QOS_INFO: ''}, headers=headers) + logger.debug("Get bucket qos info done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status)) return RequestResult(resp) @@ -2181,8 +2182,11 @@ def set_bucket_storage_capacity(self, user_qos): :param user_qos :class:`BucketUserQos ` """ + logger.debug("Start to set bucket storage capacity: {0}".format(self.bucket_name)) data = xml_utils.to_put_bucket_user_qos(user_qos) resp = self.__do_bucket('PUT', data=data, params={Bucket.USER_QOS: ''}) + logger.debug("Set bucket storage capacity done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status)) + return RequestResult(resp) def get_bucket_storage_capacity(self): @@ -2190,9 +2194,41 @@ def get_bucket_storage_capacity(self): :return: :class:`GetBucketUserQosResult ` """ + logger.debug("Start to get bucket storage capacity, bucket:{0}".format(self.bucket_name)) resp = self._Bucket__do_bucket('GET', params={Bucket.USER_QOS: ''}) + logger.debug("Get bucket storage capacity done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status)) + return self._parse_result(resp, xml_utils.parse_get_bucket_user_qos, GetBucketUserQosResult) + def put_async_fetch_task(self, task_config): + """创建一个异步获取文件到bucket的任务。 + + :param task_config: 任务配置 + :type task_config: class:`AsyncFetchTaskConfiguration ` + + :return: :class:`PutAsyncFetchTaskResult ` + """ + logger.debug("Start to put async fetch task, bucket:{0}".format(self.bucket_name)) + data = xml_utils.to_put_async_fetch_task(task_config) + headers = http.CaseInsensitiveDict() + headers['Content-MD5'] = utils.content_md5(data) + resp = self._Bucket__do_bucket('POST', data=data, params={Bucket.ASYNC_FETCH: ''}, headers=headers) + logger.debug("Put async fetch task done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status)) + + return self._parse_result(resp, xml_utils.parse_put_async_fetch_task_result, PutAsyncFetchTaskResult) + + def get_async_fetch_task(self, task_id): + """获取一个异步获取文件到bucket的任务信息。 + + :param str task_id: 任务id + :return: :class:`GetAsyncFetchTaskResult ` + """ + logger.debug("Start to get async fetch task, bucket:{0}, task_id:{1}".format(self.bucket_name, task_id)) + resp = self._Bucket__do_bucket('GET', headers={OSS_TASK_ID: task_id}, params={Bucket.ASYNC_FETCH: ''}) + logger.debug("Put async fetch task done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status)) + + return self._parse_result(resp, xml_utils.parse_get_async_fetch_task_result, GetAsyncFetchTaskResult) + def _get_bucket_config(self, config): """获得Bucket某项配置,具体哪种配置由 `config` 指定。该接口直接返回 `RequestResult` 对象。 通过read()接口可以获得XML字符串。不建议使用。 diff --git a/oss2/auth.py b/oss2/auth.py index 943edd5b..2b852998 100644 --- a/oss2/auth.py +++ b/oss2/auth.py @@ -73,7 +73,7 @@ class Auth(AuthBase): 'restore', 'qos', 'referer', 'stat', 'bucketInfo', 'append', 'position', 'security-token', 'live', 'comp', 'status', 'vod', 'startTime', 'endTime', 'x-oss-process', 'symlink', 'callback', 'callback-var', 'tagging', 'encryption', 'versions', - 'versioning', 'versionId', 'policy', 'requestPayment', 'x-oss-traffic-limit', 'qosInfo'] + 'versioning', 'versionId', 'policy', 'requestPayment', 'x-oss-traffic-limit', 'qosInfo', 'asyncFetch'] ) def _sign_request(self, req, bucket_name, key): diff --git a/oss2/exceptions.py b/oss2/exceptions.py index 8d01baf8..7a560d2c 100644 --- a/oss2/exceptions.py +++ b/oss2/exceptions.py @@ -269,6 +269,9 @@ class SignatureDoesNotMatch(ServerError): status = 403 code = 'SignatureDoesNotMatch' +class ObjectAlreadyExists(ServerError): + status = 400 + code = 'ObjectAlreadyExists' def make_exception(resp): status = resp.status diff --git a/oss2/headers.py b/oss2/headers.py index 66207f16..6cd59583 100644 --- a/oss2/headers.py +++ b/oss2/headers.py @@ -37,6 +37,8 @@ OSS_TRAFFIC_LIMIT = 'x-oss-traffic-limit' +OSS_TASK_ID = 'x-oss-task-id' + class RequestHeader(dict): def __init__(self, *arg, **kw): super(RequestHeader, self).__init__(*arg, **kw) diff --git a/oss2/models.py b/oss2/models.py index 8754d2ca..c891e262 100644 --- a/oss2/models.py +++ b/oss2/models.py @@ -1514,3 +1514,81 @@ class GetBucketUserQosResult(RequestResult, BucketUserQos): def __init__(self, resp): RequestResult.__init__(self, resp) BucketUserQos.__init__(self) + + +ASYNC_FETCH_TASK_STATE_RUNNING = 'Running' +ASYNC_FETCH_TASK_STATE_RETRY = 'Retry' +ASYNC_FETCH_TASK_STATE_FETCH_SUCCESS_CALLBACK_FAILED = 'FetchSuccessCallbackFailed' +ASYNC_FETCH_TASK_STATE_FAILED= 'Failed' +ASYNC_FETCH_TASK_STATE_SUCCESS = 'Success' + +class AsyncFetchTaskConfiguration(object): + """异步获取文件到bucket到任务配置项 + + :param url: 源文件url + :type url: str + + :param object_name: 文件的名称。 + :type task_state: str + + :param host: 文件所在服务器的host,如果不指定则会根据url解析填充。 + :type host: str + + :param content_md5: 指定校验源文件的md5 + :type content_md5: str + + :param callback: 指定fetch成功知乎回调给用户的引用服务器,如果不指定则不回调。 + callback格式与OSS上传回调的请求头callback一致,详情见官网。 + :type callback: str + + :param ignore_same_key: 默认为True表示如果文件已存在则忽略本次任务,api调用将会报错。如果为False,则会覆盖已存在的object。 + :type ignore_same_key: bool + """ + def __init__(self, + url, + object_name, + host = None, + content_md5 = None, + callback = None, + ignore_same_key = None): + + self.url = url + self.object_name = object_name + self.host = host + self.content_md5 = content_md5 + self.callback = callback + self.ignore_same_key = ignore_same_key + +class PutAsyncFetchTaskResult(RequestResult): + def __init__(self, resp, task_id=None): + RequestResult.__init__(self, resp) + self.task_id = task_id + +class GetAsyncFetchTaskResult(RequestResult): + """获取异步获取文件到bucket的任务的返回结果 + + :param task_id: 任务id + :type task_id: str + + :param task_state: 取值范围:oss2.models.ASYNC_FETCH_TASK_STATE_RUNNING, oss2.models.ASYNC_FETCH_TASK_STATE_RETRY, + oss2.models.ASYNC_FETCH_TASK_STATE_FETCH_SUCCESS_CALLBACK_FAILED, oss2.models.ASYNC_FETCH_TASK_STATE_FAILED, + oss2.models.ASYNC_FETCH_TASK_STATE_SUCCESS。 + :type task_state: str + + :param error_msg: 错误信息 + :type error_msg: str + + :param task_config: 任务配置信息 + :type task_config: class:`AsyncFetchTaskConfiguration ` + """ + def __init__(self, resp, + task_id=None, + task_state=None, + error_msg=None, + task_config=None): + + RequestResult.__init__(self, resp) + self.task_id = task_id + self.task_state = task_state + self.error_msg = error_msg + self.task_config = task_config diff --git a/oss2/xml_utils.py b/oss2/xml_utils.py index 5d7b7530..85d8ef44 100644 --- a/oss2/xml_utils.py +++ b/oss2/xml_utils.py @@ -49,7 +49,8 @@ REDIRECT_TYPE_INTERNAL, REDIRECT_TYPE_ALICDN, NoncurrentVersionStorageTransition, - NoncurrentVersionExpiration) + NoncurrentVersionExpiration, + AsyncFetchTaskConfiguration) from .select_params import (SelectJsonTypes, SelectParameters) @@ -1288,3 +1289,48 @@ def to_put_bucket_user_qos(user_qos): _add_text_child(root, 'StorageCapacity', str(user_qos.storage_capacity)) return _node_to_string(root) + + +def to_put_async_fetch_task(task_config): + root = ElementTree.Element('AsyncFetchTaskConfiguration') + + _add_text_child(root, 'Url', task_config.url) + _add_text_child(root, 'Object', task_config.object_name) + + if task_config.host is not None: + _add_text_child(root, 'Host', task_config.host) + if task_config.content_md5 is not None: + _add_text_child(root, 'ContentMD5', task_config.content_md5) + if task_config.callback is not None: + _add_text_child(root, 'Callback', task_config.callback) + if task_config.ignore_same_key is not None: + _add_text_child(root, 'IgnoreSameKey', str(task_config.ignore_same_key).lower()) + + return _node_to_string(root) + +def parse_put_async_fetch_task_result(result, body): + root = ElementTree.fromstring(body) + + result.task_id = _find_tag(root, 'TaskId') + + return result + +def _parse_async_fetch_task_configuration(task_info_node): + url = _find_tag(task_info_node, 'Url') + object_name = _find_tag(task_info_node, 'Object') + host = _find_tag(task_info_node, 'Host') + content_md5 = _find_tag(task_info_node, 'ContentMD5') + callback = _find_tag(task_info_node, 'Callback') + ignore_same_key = _find_bool(task_info_node, 'IgnoreSameKey') + + return AsyncFetchTaskConfiguration(url, object_name, host, content_md5, callback, ignore_same_key) + +def parse_get_async_fetch_task_result(result, body): + root = ElementTree.fromstring(body) + + result.task_id = _find_tag(root, 'TaskId') + result.task_state = _find_tag(root, 'State') + result.error_msg = _find_tag(root, 'ErrorMsg') + result.task_config = _parse_async_fetch_task_configuration(root.find('TaskInfo')) + + return result \ No newline at end of file diff --git a/tests/test_async_fetch_task.py b/tests/test_async_fetch_task.py new file mode 100644 index 00000000..c7e7d77c --- /dev/null +++ b/tests/test_async_fetch_task.py @@ -0,0 +1,159 @@ +# -*- coding: utf-8 -*- + +import oss2 +import time +import base64 +from .common import * +from oss2.compat import to_bytes +from oss2.models import (AsyncFetchTaskConfiguration, ASYNC_FETCH_TASK_STATE_SUCCESS, + ASYNC_FETCH_TASK_STATE_FETCH_SUCCESS_CALLBACK_FAILED, + ASYNC_FETCH_TASK_STATE_FAILED) + + +class TestAsyncFetchTask(OssTestCase): + def setUp(self): + OssTestCase.setUp(self) + self.endpoint = "http://oss-ap-south-1.aliyuncs.com" + + self.fetch_object_name = 'test-async-fetch-task.txt' + self.bucket.put_object(self.fetch_object_name, '123') + + meta = self.bucket.head_object(self.fetch_object_name) + self.fetch_content_md5 = meta.headers.get('Content-MD5') + self.fetch_url = self.bucket.sign_url('GET', self.fetch_object_name, 60*60) + + def test_async_fetch_task(self): + auth = oss2.Auth(OSS_ID, OSS_SECRET) + bucket_name = OSS_BUCKET + "-test-async-fetch-task" + bucket = oss2.Bucket(auth, self.endpoint, bucket_name) + bucket.create_bucket() + + object_name = self.fetch_object_name+'-destination' + task_config = AsyncFetchTaskConfiguration(self.fetch_url, object_name=object_name, content_md5=self.fetch_content_md5, ignore_same_key=False) + + result = bucket.put_async_fetch_task(task_config) + task_id = result.task_id + time.sleep(5) + + result = bucket.get_async_fetch_task(task_id) + self.assertEqual(task_id, result.task_id) + self.assertEqual(ASYNC_FETCH_TASK_STATE_SUCCESS, result.task_state) + self.assertEqual('', result.error_msg) + task_config = result.task_config + self.assertEqual(self.fetch_url, task_config.url) + self.assertEqual(self.fetch_content_md5, task_config.content_md5) + self.assertEqual(object_name, task_config.object_name) + self.assertFalse(task_config.ignore_same_key) + self.assertEqual('', task_config.host) + self.assertEqual('', task_config.callback) + + bucket.delete_object(object_name) + bucket.delete_bucket() + + def test_async_fetch_task_with_few_argument(self): + auth = oss2.Auth(OSS_ID, OSS_SECRET) + bucket_name = OSS_BUCKET + "-test-async-fetch-task" + bucket = oss2.Bucket(auth, self.endpoint, bucket_name) + bucket.create_bucket() + + object_name = self.fetch_object_name+'-destination' + task_config = AsyncFetchTaskConfiguration(self.fetch_url, object_name) + + result = bucket.put_async_fetch_task(task_config) + task_id = result.task_id + time.sleep(5) + + result = bucket.get_async_fetch_task(task_id) + self.assertEqual(task_id, result.task_id) + self.assertEqual(ASYNC_FETCH_TASK_STATE_SUCCESS, result.task_state) + self.assertEqual('', result.error_msg) + task_config = result.task_config + self.assertEqual(self.fetch_url, task_config.url) + self.assertEqual('', task_config.content_md5) + self.assertEqual(object_name, task_config.object_name) + self.assertTrue(task_config.ignore_same_key) + self.assertEqual('', task_config.host) + self.assertEqual('', task_config.callback) + + bucket.delete_object(object_name) + bucket.delete_bucket() + + def test_fetch_success_callback_failed_state(self): + auth = oss2.Auth(OSS_ID, OSS_SECRET) + bucket_name = OSS_BUCKET + "-test-async-fetch-task-callback" + bucket = oss2.Bucket(auth, self.endpoint, bucket_name) + bucket.create_bucket() + + object_name = self.fetch_object_name+'-destination' + callback = '{"callbackUrl":"www.abc.com/callback","callbackBody":"${etag}"}' + base64_callback = oss2.utils.b64encode_as_string(to_bytes(callback)) + task_config = AsyncFetchTaskConfiguration(self.fetch_url, object_name=object_name, callback=base64_callback) + + result = bucket.put_async_fetch_task(task_config) + task_id = result.task_id + time.sleep(5) + + result = bucket.get_async_fetch_task(task_id) + + self.assertEqual(task_id, result.task_id) + self.assertEqual(ASYNC_FETCH_TASK_STATE_FETCH_SUCCESS_CALLBACK_FAILED, result.task_state) + self.assertNotEqual('', result.error_msg) + task_config = result.task_config + self.assertEqual(self.fetch_url, task_config.url) + self.assertEqual('', task_config.content_md5) + self.assertEqual(object_name, task_config.object_name) + self.assertTrue(task_config.ignore_same_key) + self.assertEqual('', task_config.host) + self.assertEqual(base64_callback, task_config.callback) + + bucket.delete_object(object_name) + bucket.delete_bucket() + + + def test_failed_state(self): + auth = oss2.Auth(OSS_ID, OSS_SECRET) + bucket_name = OSS_BUCKET + "-test-async-fetch-task-callback" + bucket = oss2.Bucket(auth, self.endpoint, bucket_name) + bucket.create_bucket() + + object_name = self.fetch_object_name+'-destination' + + task_config = AsyncFetchTaskConfiguration('http://invalidUrl.com', object_name=object_name) + + result = bucket.put_async_fetch_task(task_config) + task_id = result.task_id + time.sleep(5) + + result = bucket.get_async_fetch_task(task_id) + + self.assertEqual(task_id, result.task_id) + self.assertEqual(ASYNC_FETCH_TASK_STATE_FAILED, result.task_state) + self.assertNotEqual('', result.error_msg) + + bucket.delete_bucket() + + def test_ignore_same_key(self): + auth = oss2.Auth(OSS_ID, OSS_SECRET) + bucket_name = OSS_BUCKET + "-test-async-fetch-task" + bucket = oss2.Bucket(auth, self.endpoint, bucket_name) + bucket.create_bucket() + + object_name = self.fetch_object_name+'-destination' + bucket.put_object(object_name, 'test-content') + + task_config = AsyncFetchTaskConfiguration(self.fetch_url, object_name=object_name, ignore_same_key=False) + result = bucket.put_async_fetch_task(task_config) + task_id = result.task_id + self.assertNotEqual('', task_id) + + task_config = AsyncFetchTaskConfiguration(self.fetch_url, object_name=object_name, ignore_same_key=True) + self.assertRaises(oss2.exceptions.ObjectAlreadyExists, bucket.put_async_fetch_task, task_config) + + task_config = AsyncFetchTaskConfiguration(self.fetch_url, object_name=object_name) + self.assertRaises(oss2.exceptions.ObjectAlreadyExists, bucket.put_async_fetch_task, task_config) + + bucket.delete_object(object_name) + + +if __name__ == '__main__': + unittest.main() From 5ca099642a0f292f68ceaeddd911a9fb1222591b Mon Sep 17 00:00:00 2001 From: liyanzhang505 Date: Fri, 6 Dec 2019 14:52:38 +0800 Subject: [PATCH 2/5] 1, Support sequential upload mode. 2, Support signed url with request payment. --- oss2/api.py | 15 +++++++--- oss2/auth.py | 3 +- oss2/resumable.py | 44 +++++++++++++++++++++++++--- tests/test_multipart.py | 35 +++++++++++++++++++++- tests/test_object_request_payment.py | 26 +++++++++++++++- tests/test_upload.py | 22 ++++++++++++++ 6 files changed, 134 insertions(+), 11 deletions(-) diff --git a/oss2/api.py b/oss2/api.py index 394b6696..9cd2b878 100644 --- a/oss2/api.py +++ b/oss2/api.py @@ -386,6 +386,7 @@ class Bucket(_Base): QOS_INFO = 'qosInfo' USER_QOS = 'qos' ASYNC_FETCH = 'asyncFetch' + SEQUENTIAL = 'sequential' def __init__(self, auth, endpoint, bucket_name, is_cname=False, @@ -1208,7 +1209,7 @@ def delete_object_versions(self, keylist_versions, headers=None): logger.debug("Delete object versions done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status)) return self._parse_result(resp, xml_utils.parse_batch_delete_objects, BatchDeleteObjectsResult) - def init_multipart_upload(self, key, headers=None): + def init_multipart_upload(self, key, headers=None, params=None): """初始化分片上传。 返回值中的 `upload_id` 以及Bucket名和Object名三元组唯一对应了此次分片上传事件。 @@ -1222,9 +1223,15 @@ def init_multipart_upload(self, key, headers=None): """ headers = utils.set_content_type(http.CaseInsensitiveDict(headers), key) - logger.debug("Start to init multipart upload, bucket: {0}, keys: {1}, headers: {2}".format( - self.bucket_name, to_string(key), headers)) - resp = self.__do_object('POST', key, params={'uploads': ''}, headers=headers) + if params is None: + tmp_params = dict() + else: + tmp_params = params.copy() + + tmp_params['uploads'] = '' + logger.debug("Start to init multipart upload, bucket: {0}, keys: {1}, headers: {2}, params: {3}".format( + self.bucket_name, to_string(key), headers, tmp_params)) + resp = self.__do_object('POST', key, params=tmp_params, headers=headers) logger.debug("Init multipart upload done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status)) return self._parse_result(resp, xml_utils.parse_init_multipart_upload, InitMultipartUploadResult) diff --git a/oss2/auth.py b/oss2/auth.py index 2b852998..584e62bf 100644 --- a/oss2/auth.py +++ b/oss2/auth.py @@ -73,7 +73,8 @@ class Auth(AuthBase): 'restore', 'qos', 'referer', 'stat', 'bucketInfo', 'append', 'position', 'security-token', 'live', 'comp', 'status', 'vod', 'startTime', 'endTime', 'x-oss-process', 'symlink', 'callback', 'callback-var', 'tagging', 'encryption', 'versions', - 'versioning', 'versionId', 'policy', 'requestPayment', 'x-oss-traffic-limit', 'qosInfo', 'asyncFetch'] + 'versioning', 'versionId', 'policy', 'requestPayment', 'x-oss-traffic-limit', 'qosInfo', 'asyncFetch', + 'x-oss-request-payer', 'sequential'] ) def _sign_request(self, req, bucket_name, key): diff --git a/oss2/resumable.py b/oss2/resumable.py index 3acb4112..f2180f19 100644 --- a/oss2/resumable.py +++ b/oss2/resumable.py @@ -39,7 +39,8 @@ def resumable_upload(bucket, key, filename, multipart_threshold=None, part_size=None, progress_callback=None, - num_threads=None): + num_threads=None, + params=None): """断点上传本地文件。 实现中采用分片上传方式上传本地文件,缺省的并发数是 `oss2.defaults.multipart_num_threads` ,并且在 @@ -67,6 +68,11 @@ def resumable_upload(bucket, key, filename, :param part_size: 指定分片上传的每个分片的大小。如不指定,则自动计算。 :param progress_callback: 上传进度回调函数。参见 :ref:`progress_callback` 。 :param num_threads: 并发上传的线程数,如不指定则使用 `oss2.defaults.multipart_num_threads` 。 + + :param params: HTTP请求参数 + # 只有'sequential'这个参数才会被传递到外部函数init_multipart_upload中。 + # 其他参数视为无效参数不会往外部函数传递。 + :type params: dict """ logger.debug("Start to resumable upload, bucket: {0}, key: {1}, filename: {2}, headers: {3}, " "multipart_threshold: {4}, part_size: {5}, num_threads: {6}".format(bucket.bucket_name, to_string(key), @@ -81,7 +87,8 @@ def resumable_upload(bucket, key, filename, part_size=part_size, headers=headers, progress_callback=progress_callback, - num_threads=num_threads) + num_threads=num_threads, + params=params) result = uploader.upload() else: with open(to_unicode(filename), 'rb') as f: @@ -238,6 +245,31 @@ def _populate_valid_headers(headers=None, valid_keys=None): return valid_headers +def _populate_valid_params(params=None, valid_keys=None): + """构建只包含有效keys的params + + :param params: 需要过滤的params + :type params: dict + + :param valid_keys: 有效的关键key列表 + :type valid_keys: list + + :return: 只包含有效keys的params + """ + if params is None or valid_keys is None: + return None + + valid_params = dict() + + for key in valid_keys: + if params.get(key) is not None: + valid_params[key] = params[key] + + if len(valid_params) == 0: + valid_params = None + + return valid_params + class _ResumableOperation(object): def __init__(self, bucket, key, filename, size, store, progress_callback=None, versionid=None): @@ -485,7 +517,8 @@ def __init__(self, bucket, key, filename, size, headers=None, part_size=None, progress_callback=None, - num_threads=None): + num_threads=None, + params=None): super(_ResumableUploader, self).__init__(bucket, key, filename, size, store or ResumableStore(), progress_callback=progress_callback) @@ -500,6 +533,8 @@ def __init__(self, bucket, key, filename, size, self.__upload_id = None + self.__params = params + # protect below fields self.__lock = threading.Lock() self.__record = None @@ -583,7 +618,8 @@ def __load_record(self): part_size = determine_part_size(self.size, self.__part_size) logger.debug("Upload File size: {0}, User-specify part_size: {1}, Calculated part_size: {2}".format( self.size, self.__part_size, part_size)) - upload_id = self.bucket.init_multipart_upload(self.key, headers=self.__headers).upload_id + params = _populate_valid_params(self.__params, [Bucket.SEQUENTIAL]) + upload_id = self.bucket.init_multipart_upload(self.key, headers=self.__headers, params=params).upload_id record = {'upload_id': upload_id, 'mtime': self.__mtime, 'size': self.size, 'parts': [], 'abspath': self._abspath, 'bucket': self.bucket.bucket_name, 'key': self.key, 'part_size': part_size} diff --git a/tests/test_multipart.py b/tests/test_multipart.py index 51203398..2a67fc6b 100644 --- a/tests/test_multipart.py +++ b/tests/test_multipart.py @@ -206,6 +206,39 @@ def test_multipart_with_object_tagging(self): self.assertEqual('中文', result.tag_set.tagging_rule[' + ']) result = self.bucket.delete_object_tagging(key) - + + def test_multipart_sequential(self): + endpoint = "http://oss-cn-shanghai.aliyuncs.com" + auth = oss2.Auth(OSS_ID, OSS_SECRET) + bucket_name = OSS_BUCKET + "-test-multipart-sequential" + bucket = oss2.Bucket(auth, endpoint, bucket_name) + bucket.create_bucket() + + key = self.random_key() + content = random_bytes(128 * 1024) + + parts = [] + upload_id = bucket.init_multipart_upload(key).upload_id + result = bucket.upload_part(key, upload_id, 1, content) + parts.append(oss2.models.PartInfo(1, result.etag, size=len(content), part_crc=result.crc)) + bucket.complete_multipart_upload(key, upload_id, parts) + + result = bucket.get_object(key) + self.assertIsNone(result.resp.headers.get('Content-MD5')) + + parts = [] + params = {'sequential':''} + upload_id = bucket.init_multipart_upload(key, params=params).upload_id + result = bucket.upload_part(key, upload_id, 1, content) + parts.append(oss2.models.PartInfo(1, result.etag, size=len(content), part_crc=result.crc)) + bucket.complete_multipart_upload(key, upload_id, parts) + + result = bucket.get_object(key) + self.assertIsNotNone(result.resp.headers.get('Content-MD5')) + + bucket.delete_object(key) + bucket.delete_bucket() + + if __name__ == '__main__': unittest.main() diff --git a/tests/test_object_request_payment.py b/tests/test_object_request_payment.py index 056393bd..74de0913 100644 --- a/tests/test_object_request_payment.py +++ b/tests/test_object_request_payment.py @@ -748,7 +748,31 @@ def test_part_iterator(self): up_iter = oss2.PartIterator(self.payer_bucket, key, upload_id, headers=headers) for up in up_iter: pass - + + def test_put_object_with_signed_url(self): + key = 'request-payment-test-put-object-signed-url' + file_name = self._prepare_temp_file_with_size(1024) + + params = dict() + params[OSS_REQUEST_PAYER] = "requester" + url = self.payer_bucket.sign_url('PUT', key, 60, params=params) + self.payer_bucket.put_object_with_url_from_file(url, file_name) + + def test_get_object_with_signed_url(self): + key = 'request-payment-test-get-object-signed-url' + content = b'a' * 1024 + file_name = key + '.txt' + + result = self.bucket.put_object(key, content); + self.assertEqual(result.status, 200) + + params = dict() + params[OSS_REQUEST_PAYER] = "requester" + url = self.payer_bucket.sign_url('GET', key, 60, params=params) + result = self.payer_bucket.get_object_with_url_to_file(url, file_name) + + os.remove(file_name) + self.bucket.delete_object(key) if __name__ == '__main__': unittest.main() \ No newline at end of file diff --git a/tests/test_upload.py b/tests/test_upload.py index cebcddb9..fcc03d01 100644 --- a/tests/test_upload.py +++ b/tests/test_upload.py @@ -310,6 +310,28 @@ def test_upload_large_with_tagging(self): self.assertEqual(0, result.tag_set.len()) self.bucket.delete_object(key) + def test_upload_sequenial(self): + endpoint = "http://oss-cn-shanghai.aliyuncs.com" + auth = oss2.Auth(OSS_ID, OSS_SECRET) + bucket_name = OSS_BUCKET + "-test-upload-sequential" + bucket = oss2.Bucket(auth, endpoint, bucket_name) + bucket.create_bucket() + + key = random_string(16) + content = random_bytes(5 * 100 * 1024) + pathname = self._prepare_temp_file(content) + + oss2.resumable_upload(bucket, key, pathname, multipart_threshold=200 * 1024, part_size=None) + result = bucket.get_object(key) + self.assertIsNone(result.resp.headers.get('Content-MD5')) + + params={'sequential' : ''} + oss2.resumable_upload(bucket, key, pathname, multipart_threshold=200 * 1024, part_size=None, params=params) + result = bucket.get_object(key) + self.assertIsNotNone(result.resp.headers.get('Content-MD5')) + + bucket.delete_object(key) + bucket.delete_bucket() if __name__ == '__main__': unittest.main() From 4a5c450ab3c2d567adf99921d80176036fb1a995 Mon Sep 17 00:00:00 2001 From: "guangjun.hgj" Date: Mon, 23 Dec 2019 21:34:53 +0800 Subject: [PATCH 3/5] support to post a encode string from header. --- oss2/auth.py | 72 ++++++++++++++++++++++++++++++++++++++++++-- tests/test_object.py | 17 ++++++++++- 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/oss2/auth.py b/oss2/auth.py index 584e62bf..489f204f 100644 --- a/oss2/auth.py +++ b/oss2/auth.py @@ -5,7 +5,7 @@ import time from . import utils -from .compat import urlquote, to_bytes +from .compat import urlquote, to_bytes, is_py2 from .headers import * import logging @@ -96,7 +96,10 @@ def _sign_url(self, req, bucket_name, key, expires): return req.url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in req.params.items()) def __make_signature(self, req, bucket_name, key): - string_to_sign = self.__get_string_to_sign(req, bucket_name, key) + if is_py2: + string_to_sign = self.__get_string_to_sign(req, bucket_name, key) + else: + string_to_sign = self.__get_bytes_to_sign(req, bucket_name, key) logger.debug('Make signature: string to be signed = {0}'.format(string_to_sign)) @@ -159,6 +162,33 @@ def __param_to_query(self, k, v): else: return k + def __get_bytes_to_sign(self, req, bucket_name, key): + resource_bytes = self.__get_resource_string(req, bucket_name, key).encode('utf-8') + headers_bytes = self.__get_headers_bytes(req) + + content_md5 = req.headers.get('content-md5', '').encode('utf-8') + content_type = req.headers.get('content-type', '').encode('utf-8') + date = req.headers.get('date', '').encode('utf-8') + return b'\n'.join([req.method.encode('utf-8'), + content_md5, + content_type, + date, + headers_bytes + resource_bytes]) + + def __get_headers_bytes(self, req): + headers = req.headers + canon_headers = [] + for k, v in headers.items(): + lower_key = k.lower() + if lower_key.startswith('x-oss-'): + canon_headers.append((lower_key, v)) + + canon_headers.sort(key=lambda x: x[0]) + + if canon_headers: + return b'\n'.join(to_bytes(k) + b':' + to_bytes(v) for k, v in canon_headers) + b'\n' + else: + return b'' class AnonymousAuth(object): """用于匿名访问。 @@ -300,7 +330,10 @@ def _sign_url(self, req, bucket_name, key, expires, in_additional_headers=None): return req.url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in req.params.items()) def __make_signature(self, req, bucket_name, key, additional_headers): - string_to_sign = self.__get_string_to_sign(req, bucket_name, key, additional_headers) + if is_py2: + string_to_sign = self.__get_string_to_sign(req, bucket_name, key, additional_headers) + else: + string_to_sign = self.__get_bytes_to_sign(req, bucket_name, key, additional_headers) logger.debug('Make signature: string to be signed = {0}'.format(string_to_sign)) @@ -374,3 +407,36 @@ def __get_canonicalized_oss_headers(self, req, additional_headers): canon_headers.sort(key=lambda x: x[0]) return ''.join(v[0] + ':' + v[1] + '\n' for v in canon_headers) + + def __get_bytes_to_sign(self, req, bucket_name, key, additional_header_list): + verb = req.method.encode('utf-8') + content_md5 = req.headers.get('content-md5', '').encode('utf-8') + content_type = req.headers.get('content-type', '').encode('utf-8') + date = req.headers.get('date', '').encode('utf-8') + + canonicalized_oss_headers = self.__get_canonicalized_oss_headers_bytes(req, additional_header_list) + additional_headers = ';'.join(sorted(additional_header_list)).encode('utf-8') + canonicalized_resource = self.__get_resource_string(req, bucket_name, key).encode('utf-8') + + return verb + b'\n' +\ + content_md5 + b'\n' +\ + content_type + b'\n' +\ + date + b'\n' +\ + canonicalized_oss_headers +\ + additional_headers + b'\n' +\ + canonicalized_resource + + def __get_canonicalized_oss_headers_bytes(self, req, additional_headers): + """ + :param additional_headers: 小写的headers列表, 并且这些headers都不以'x-oss-'为前缀. + """ + canon_headers = [] + + for k, v in req.headers.items(): + lower_key = k.lower() + if lower_key.startswith('x-oss-') or lower_key in additional_headers: + canon_headers.append((lower_key, v)) + + canon_headers.sort(key=lambda x: x[0]) + + return b''.join(to_bytes(v[0]) + b':' + to_bytes(v[1]) + b'\n' for v in canon_headers) diff --git a/tests/test_object.py b/tests/test_object.py index 10df80ff..38dccbd8 100644 --- a/tests/test_object.py +++ b/tests/test_object.py @@ -12,7 +12,7 @@ from oss2.compat import is_py2, is_py33 from oss2.models import Tagging, TaggingRule from oss2.headers import OSS_OBJECT_TAGGING, OSS_OBJECT_TAGGING_COPY_DIRECTIVE -from oss2.compat import urlunquote, urlquote +from oss2.compat import urlunquote, urlquote, to_bytes, to_string from .common import * @@ -1455,6 +1455,21 @@ def test_put_symlink_with_tagging_with_wrong_num(self): self.assertEqual(head_result.headers['x-oss-meta-key1'], 'value1') self.assertEqual(head_result.headers['x-oss-meta-key2'], 'value2') + def test_put_object_with_unicode_header(self): + key = self.random_key() + value = '测试' + byte = to_bytes(value) + headers={'x-oss-meta-unicode': byte} + self.bucket.put_object(key, 'a novel', headers=headers) + result = self.bucket.head_object(key) + self.assertEqual(result.status, 200) + newstr = result.headers['x-oss-meta-unicode'] + if is_py2: + b_str = newstr + else: + b_str = newstr.encode('iso-8859-1') + self.assertEqual(to_string(b_str), value) + class TestSign(TestObject): """ From 06660dddaffe9c82ab00881f9a57eeb0f8e3141a Mon Sep 17 00:00:00 2001 From: "guangjun.hgj" Date: Wed, 25 Dec 2019 14:49:21 +0800 Subject: [PATCH 4/5] Compatible with old version bucketInfo API --- oss2/xml_utils.py | 12 ++++++++++-- unittests/test_xml_utils.py | 38 +++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/oss2/xml_utils.py b/oss2/xml_utils.py index 85d8ef44..70620f44 100644 --- a/oss2/xml_utils.py +++ b/oss2/xml_utils.py @@ -296,12 +296,14 @@ def parse_get_bucket_info(result, body): result.location = _find_tag(root, 'Bucket/Location') result.owner = Owner(_find_tag(root, 'Bucket/Owner/DisplayName'), _find_tag(root, 'Bucket/Owner/ID')) result.acl = AccessControlList(_find_tag(root, 'Bucket/AccessControlList/Grant')) - result.data_redundancy_type = _find_tag(root, 'Bucket/DataRedundancyType') result.comment = _find_tag(root, 'Bucket/Comment') server_side_encryption = root.find("Bucket/ServerSideEncryptionRule") - result.bucket_encryption_rule = _parse_bucket_encryption_info(server_side_encryption) + if server_side_encryption is None: + result.bucket_encryption_rule = None + else: + result.bucket_encryption_rule = _parse_bucket_encryption_info(server_side_encryption) bucket_versioning = root.find('Bucket/Versioning') @@ -310,6 +312,12 @@ def parse_get_bucket_info(result, body): else: result.versioning_status = to_string(bucket_versioning.text) + data_redundancy_type = root.find('Bucket/DataRedundancyType') + if data_redundancy_type is None or data_redundancy_type.text is None: + result.data_redundancy_type = None + else: + result.data_redundancy_type = to_string(data_redundancy_type.text) + return result def _parse_bucket_encryption_info(node): diff --git a/unittests/test_xml_utils.py b/unittests/test_xml_utils.py index 4825cc9e..18947bf5 100644 --- a/unittests/test_xml_utils.py +++ b/unittests/test_xml_utils.py @@ -3,6 +3,9 @@ import unittest import xml.etree.ElementTree as ElementTree from oss2.xml_utils import _find_tag, _find_bool +from oss2.xml_utils import parse_get_bucket_info +from .common import MockResponse +import oss2 class TestXmlUtils(unittest.TestCase): @@ -35,6 +38,41 @@ def test_find_bool(self): self.assertRaises(RuntimeError, _find_bool, root, 'none_exist_tag') + def test_parse_get_bucket_info(self): + body = ''' + + + 2013-07-31T10:56:21.000Z + oss-cn-hangzhou.aliyuncs.com + oss-cn-hangzhou-internal.aliyuncs.com + oss-cn-hangzhou + oss-example + IA + + username + 27183473914**** + + + private + + test + + + ''' + headers = oss2.CaseInsensitiveDict({ + 'Server': 'AliyunOSS', + 'Date': 'Fri, 11 Dec 2015 11:40:30 GMT', + 'Content-Length': len(body), + 'Connection': 'keep-alive', + 'x-oss-request-id': '566AB62EB06147681C283D73', + 'ETag': '7AE1A589ED6B161CAD94ACDB98206DA6' + }) + resp = MockResponse(200, headers, body) + + result = oss2.models.GetBucketInfoResult(resp) + parse_get_bucket_info(result, body) + self.assertEqual(result.location, 'oss-cn-hangzhou') + if __name__ == '__main__': unittest.main() \ No newline at end of file From 5d739ff8ab8fd2f257b34185e6b3cb1058817661 Mon Sep 17 00:00:00 2001 From: "guangjun.hgj" Date: Thu, 26 Dec 2019 15:08:12 +0800 Subject: [PATCH 5/5] add python3.6 & ptyon3.7 --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 6cfe286a..ead58209 100644 --- a/setup.py +++ b/setup.py @@ -46,6 +46,8 @@ 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.3', 'Programming Language :: Python :: 3.4', - 'Programming Language :: Python :: 3.5' + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7' ] )