Skip to content

Commit

Permalink
Merge 5d739ff into 946e64f
Browse files Browse the repository at this point in the history
  • Loading branch information
huiguangjun committed Dec 26, 2019
2 parents 946e64f + 5d739ff commit a3996ad
Show file tree
Hide file tree
Showing 15 changed files with 666 additions and 32 deletions.
58 changes: 58 additions & 0 deletions examples/async_fetch_task.py
@@ -0,0 +1,58 @@
import os
import oss2
import base64
import time
from oss2.compat import to_bytes
from oss2.models import AsyncFetchTaskConfiguration

# 以下代码展示了创建异步获取文件到bucket任务到API的用法

# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。
# 通过环境变量获取,或者把诸如“<你的AccessKeyId>”替换成真实的AccessKeyId等。
#
# 以杭州区域为例,Endpoint可以是:
# http://oss-cn-hangzhou.aliyuncs.com
# https://oss-cn-hangzhou.aliyuncs.com
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>')

# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '请设置参数:' + param

# 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)

object_name = "test-async-object"
url = "<yourSrcObjectUrl>"
callback = '{"callbackUrl":"www.abc.com/callback","callbackBody":"${etag}"}'
base64_callback = oss2.utils.b64encode_as_string(to_bytes(callback))

# 可以选填host, callback, content_md5, ignore_same_key等参数
task_config = AsyncFetchTaskConfiguration(url, object_name, callback=base64_callback, ignore_same_key=False)

# 创建异步获取文件到bucket的任务
result = bucket.put_async_fetch_task(task_config)
task_id = result.task_id
print('task_id:', result.task_id)

time.sleep(5)

# 获取指定的异步任务信息
result = bucket.get_async_fetch_task(task_id)

# 打印获取到的异步任务信息
print('=====get result======')
print('task_id:', result.task_id)
print('state:', result.task_state)
print('error_msg:', result.error_msg)
task_config = result.task_config
print('task info:')
print('url:', task_config.url)
print('object_name:', task_config.object_name)
print('host:', task_config.host)
print('content_md5:', task_config.content_md5)
print('callback:', task_config.callback)
print('ignoreSameKey:', task_config.ignore_same_key)
77 changes: 60 additions & 17 deletions oss2/api.py
Expand Up @@ -385,6 +385,8 @@ class Bucket(_Base):
REQUESTPAYMENT = 'requestPayment'
QOS_INFO = 'qosInfo'
USER_QOS = 'qos'
ASYNC_FETCH = 'asyncFetch'
SEQUENTIAL = 'sequential'

def __init__(self, auth, endpoint, bucket_name,
is_cname=False,
Expand Down Expand Up @@ -1207,7 +1209,7 @@ def delete_object_versions(self, keylist_versions, headers=None):
logger.debug("Delete object versions done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return self._parse_result(resp, xml_utils.parse_batch_delete_objects, BatchDeleteObjectsResult)

def init_multipart_upload(self, key, headers=None):
def init_multipart_upload(self, key, headers=None, params=None):
"""初始化分片上传。
返回值中的 `upload_id` 以及Bucket名和Object名三元组唯一对应了此次分片上传事件。
Expand All @@ -1221,9 +1223,15 @@ def init_multipart_upload(self, key, headers=None):
"""
headers = utils.set_content_type(http.CaseInsensitiveDict(headers), key)

logger.debug("Start to init multipart upload, bucket: {0}, keys: {1}, headers: {2}".format(
self.bucket_name, to_string(key), headers))
resp = self.__do_object('POST', key, params={'uploads': ''}, headers=headers)
if params is None:
tmp_params = dict()
else:
tmp_params = params.copy()

tmp_params['uploads'] = ''
logger.debug("Start to init multipart upload, bucket: {0}, keys: {1}, headers: {2}, params: {3}".format(
self.bucket_name, to_string(key), headers, tmp_params))
resp = self.__do_object('POST', key, params=tmp_params, headers=headers)
logger.debug("Init multipart upload done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return self._parse_result(resp, xml_utils.parse_init_multipart_upload, InitMultipartUploadResult)

Expand Down Expand Up @@ -2063,42 +2071,41 @@ def put_bucket_versioning(self, config, headers=None):
:return: :class:`RequestResult <oss2.models.RequestResult>`
"""
logger.debug("Start to put object versioning, bucket: {0}".format(
self.bucket_name))
logger.debug("Start to put object versioning, bucket: {0}".format(self.bucket_name))
data = self.__convert_data(BucketVersioningConfig, xml_utils.to_put_bucket_versioning, config)

headers = http.CaseInsensitiveDict(headers)
headers['Content-MD5'] = utils.content_md5(data)

resp = self.__do_bucket('PUT', data=data, params={Bucket.VERSIONING: ''}, headers=headers)
logger.debug("Put bucket versiong done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return RequestResult(resp)

def get_bucket_versioning(self):
"""
:return: :class:`GetBucketVersioningResult<oss2.models.GetBucketVersioningResult>`
"""
logger.debug("Start to get bucket versioning, bucket: {0}".format(
self.bucket_name))

logger.debug("Start to get bucket versioning, bucket: {0}".format(self.bucket_name))
resp = self.__do_bucket('GET', params={Bucket.VERSIONING: ''})
logger.debug("Get bucket versiong done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return self._parse_result(resp, xml_utils.parse_get_bucket_versioning, GetBucketVersioningResult)

def put_bucket_policy(self, policy):
"""设置bucket policy, 具体policy书写规则请参考官方文档
:param str policy:
"""

logger.debug("Start to put bucket policy, bucket: {0}, policy: {1}".format(
self.bucket_name, policy))
"""设置bucket授权策略, 具体policy书写规则请参考官方文档
:param str policy: 授权策略
"""
logger.debug("Start to put bucket policy, bucket: {0}, policy: {1}".format(self.bucket_name, policy))
resp = self.__do_bucket('PUT', data=policy, params={Bucket.POLICY: ''}, headers={'Content-MD5': utils.content_md5(policy)})
logger.debug("Put bucket policy done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return RequestResult(resp)

def get_bucket_policy(self):
"""
"""获取bucket授权策略
:return: :class:`GetBucketPolicyResult <oss2.models.GetBucketPolicyResult>`
"""

Expand All @@ -2108,7 +2115,7 @@ def get_bucket_policy(self):
return GetBucketPolicyResult(resp)

def delete_bucket_policy(self):
"""
"""删除bucket授权策略
:return: :class:`RequestResult <oss2.models.RequestResult>`
"""
logger.debug("Start to delete bucket policy, bucket: {0}".format(self.bucket_name))
Expand Down Expand Up @@ -2151,6 +2158,7 @@ def put_bucket_qos_info(self, bucket_qos_info):
headers = http.CaseInsensitiveDict()
headers['Content-MD5'] = utils.content_md5(data)
resp = self.__do_bucket('PUT', data=data, params={Bucket.QOS_INFO: ''}, headers=headers)
logger.debug("Get bucket qos info done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return RequestResult(resp)

Expand Down Expand Up @@ -2181,18 +2189,53 @@ def set_bucket_storage_capacity(self, user_qos):
:param user_qos :class:`BucketUserQos <oss2.models.BucketUserQos>`
"""
logger.debug("Start to set bucket storage capacity: {0}".format(self.bucket_name))
data = xml_utils.to_put_bucket_user_qos(user_qos)
resp = self.__do_bucket('PUT', data=data, params={Bucket.USER_QOS: ''})
logger.debug("Set bucket storage capacity done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return RequestResult(resp)

def get_bucket_storage_capacity(self):
"""获取bucket的容量信息。
:return: :class:`GetBucketUserQosResult <oss2.models.GetBucketUserQosResult>`
"""
logger.debug("Start to get bucket storage capacity, bucket:{0}".format(self.bucket_name))
resp = self._Bucket__do_bucket('GET', params={Bucket.USER_QOS: ''})
logger.debug("Get bucket storage capacity done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return self._parse_result(resp, xml_utils.parse_get_bucket_user_qos, GetBucketUserQosResult)

def put_async_fetch_task(self, task_config):
"""创建一个异步获取文件到bucket的任务。
:param task_config: 任务配置
:type task_config: class:`AsyncFetchTaskConfiguration <oss2.models.AsyncFetchTaskConfiguration>`
:return: :class:`PutAsyncFetchTaskResult <oss2.models.PutAsyncFetchTaskResult>`
"""
logger.debug("Start to put async fetch task, bucket:{0}".format(self.bucket_name))
data = xml_utils.to_put_async_fetch_task(task_config)
headers = http.CaseInsensitiveDict()
headers['Content-MD5'] = utils.content_md5(data)
resp = self._Bucket__do_bucket('POST', data=data, params={Bucket.ASYNC_FETCH: ''}, headers=headers)
logger.debug("Put async fetch task done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return self._parse_result(resp, xml_utils.parse_put_async_fetch_task_result, PutAsyncFetchTaskResult)

def get_async_fetch_task(self, task_id):
"""获取一个异步获取文件到bucket的任务信息。
:param str task_id: 任务id
:return: :class:`GetAsyncFetchTaskResult <oss2.models.GetAsyncFetchTaskResult>`
"""
logger.debug("Start to get async fetch task, bucket:{0}, task_id:{1}".format(self.bucket_name, task_id))
resp = self._Bucket__do_bucket('GET', headers={OSS_TASK_ID: task_id}, params={Bucket.ASYNC_FETCH: ''})
logger.debug("Put async fetch task done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return self._parse_result(resp, xml_utils.parse_get_async_fetch_task_result, GetAsyncFetchTaskResult)

def _get_bucket_config(self, config):
"""获得Bucket某项配置,具体哪种配置由 `config` 指定。该接口直接返回 `RequestResult` 对象。
通过read()接口可以获得XML字符串。不建议使用。
Expand Down
75 changes: 71 additions & 4 deletions oss2/auth.py
Expand Up @@ -5,7 +5,7 @@
import time

from . import utils
from .compat import urlquote, to_bytes
from .compat import urlquote, to_bytes, is_py2
from .headers import *
import logging

Expand Down Expand Up @@ -73,7 +73,8 @@ class Auth(AuthBase):
'restore', 'qos', 'referer', 'stat', 'bucketInfo', 'append', 'position', 'security-token',
'live', 'comp', 'status', 'vod', 'startTime', 'endTime', 'x-oss-process',
'symlink', 'callback', 'callback-var', 'tagging', 'encryption', 'versions',
'versioning', 'versionId', 'policy', 'requestPayment', 'x-oss-traffic-limit', 'qosInfo']
'versioning', 'versionId', 'policy', 'requestPayment', 'x-oss-traffic-limit', 'qosInfo', 'asyncFetch',
'x-oss-request-payer', 'sequential']
)

def _sign_request(self, req, bucket_name, key):
Expand All @@ -95,7 +96,10 @@ def _sign_url(self, req, bucket_name, key, expires):
return req.url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in req.params.items())

def __make_signature(self, req, bucket_name, key):
string_to_sign = self.__get_string_to_sign(req, bucket_name, key)
if is_py2:
string_to_sign = self.__get_string_to_sign(req, bucket_name, key)
else:
string_to_sign = self.__get_bytes_to_sign(req, bucket_name, key)

logger.debug('Make signature: string to be signed = {0}'.format(string_to_sign))

Expand Down Expand Up @@ -158,6 +162,33 @@ def __param_to_query(self, k, v):
else:
return k

def __get_bytes_to_sign(self, req, bucket_name, key):
resource_bytes = self.__get_resource_string(req, bucket_name, key).encode('utf-8')
headers_bytes = self.__get_headers_bytes(req)

content_md5 = req.headers.get('content-md5', '').encode('utf-8')
content_type = req.headers.get('content-type', '').encode('utf-8')
date = req.headers.get('date', '').encode('utf-8')
return b'\n'.join([req.method.encode('utf-8'),
content_md5,
content_type,
date,
headers_bytes + resource_bytes])

def __get_headers_bytes(self, req):
headers = req.headers
canon_headers = []
for k, v in headers.items():
lower_key = k.lower()
if lower_key.startswith('x-oss-'):
canon_headers.append((lower_key, v))

canon_headers.sort(key=lambda x: x[0])

if canon_headers:
return b'\n'.join(to_bytes(k) + b':' + to_bytes(v) for k, v in canon_headers) + b'\n'
else:
return b''

class AnonymousAuth(object):
"""用于匿名访问。
Expand Down Expand Up @@ -299,7 +330,10 @@ def _sign_url(self, req, bucket_name, key, expires, in_additional_headers=None):
return req.url + '?' + '&'.join(_param_to_quoted_query(k, v) for k, v in req.params.items())

def __make_signature(self, req, bucket_name, key, additional_headers):
string_to_sign = self.__get_string_to_sign(req, bucket_name, key, additional_headers)
if is_py2:
string_to_sign = self.__get_string_to_sign(req, bucket_name, key, additional_headers)
else:
string_to_sign = self.__get_bytes_to_sign(req, bucket_name, key, additional_headers)

logger.debug('Make signature: string to be signed = {0}'.format(string_to_sign))

Expand Down Expand Up @@ -373,3 +407,36 @@ def __get_canonicalized_oss_headers(self, req, additional_headers):
canon_headers.sort(key=lambda x: x[0])

return ''.join(v[0] + ':' + v[1] + '\n' for v in canon_headers)

def __get_bytes_to_sign(self, req, bucket_name, key, additional_header_list):
verb = req.method.encode('utf-8')
content_md5 = req.headers.get('content-md5', '').encode('utf-8')
content_type = req.headers.get('content-type', '').encode('utf-8')
date = req.headers.get('date', '').encode('utf-8')

canonicalized_oss_headers = self.__get_canonicalized_oss_headers_bytes(req, additional_header_list)
additional_headers = ';'.join(sorted(additional_header_list)).encode('utf-8')
canonicalized_resource = self.__get_resource_string(req, bucket_name, key).encode('utf-8')

return verb + b'\n' +\
content_md5 + b'\n' +\
content_type + b'\n' +\
date + b'\n' +\
canonicalized_oss_headers +\
additional_headers + b'\n' +\
canonicalized_resource

def __get_canonicalized_oss_headers_bytes(self, req, additional_headers):
"""
:param additional_headers: 小写的headers列表, 并且这些headers都不以'x-oss-'为前缀.
"""
canon_headers = []

for k, v in req.headers.items():
lower_key = k.lower()
if lower_key.startswith('x-oss-') or lower_key in additional_headers:
canon_headers.append((lower_key, v))

canon_headers.sort(key=lambda x: x[0])

return b''.join(to_bytes(v[0]) + b':' + to_bytes(v[1]) + b'\n' for v in canon_headers)
3 changes: 3 additions & 0 deletions oss2/exceptions.py
Expand Up @@ -269,6 +269,9 @@ class SignatureDoesNotMatch(ServerError):
status = 403
code = 'SignatureDoesNotMatch'

class ObjectAlreadyExists(ServerError):
status = 400
code = 'ObjectAlreadyExists'

def make_exception(resp):
status = resp.status
Expand Down
2 changes: 2 additions & 0 deletions oss2/headers.py
Expand Up @@ -37,6 +37,8 @@

OSS_TRAFFIC_LIMIT = 'x-oss-traffic-limit'

OSS_TASK_ID = 'x-oss-task-id'

class RequestHeader(dict):
def __init__(self, *arg, **kw):
super(RequestHeader, self).__init__(*arg, **kw)
Expand Down

0 comments on commit a3996ad

Please sign in to comment.