Skip to content

Commit

Permalink
Merge pull request #216 from liyanzhang505/dev-aysnc-fetch
Browse files Browse the repository at this point in the history
Support async fetch task.
  • Loading branch information
huiguangjun committed Dec 23, 2019
2 parents e40a252 + 747dc2e commit 12a82b5
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 15 deletions.
58 changes: 58 additions & 0 deletions examples/async_fetch_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import os
import oss2
import base64
import time
from oss2.compat import to_bytes
from oss2.models import AsyncFetchTaskConfiguration

# 以下代码展示了创建异步获取文件到bucket任务到API的用法

# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。
# 通过环境变量获取,或者把诸如“<你的AccessKeyId>”替换成真实的AccessKeyId等。
#
# 以杭州区域为例,Endpoint可以是:
# http://oss-cn-hangzhou.aliyuncs.com
# https://oss-cn-hangzhou.aliyuncs.com
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>')

# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '请设置参数:' + param

# 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)

object_name = "test-async-object"
url = "<yourSrcObjectUrl>"
callback = '{"callbackUrl":"www.abc.com/callback","callbackBody":"${etag}"}'
base64_callback = oss2.utils.b64encode_as_string(to_bytes(callback))

# 可以选填host, callback, content_md5, ignore_same_key等参数
task_config = AsyncFetchTaskConfiguration(url, object_name, callback=base64_callback, ignore_same_key=False)

# 创建异步获取文件到bucket的任务
result = bucket.put_async_fetch_task(task_config)
task_id = result.task_id
print('task_id:', result.task_id)

time.sleep(5)

# 获取指定的异步任务信息
result = bucket.get_async_fetch_task(task_id)

# 打印获取到的异步任务信息
print('=====get result======')
print('task_id:', result.task_id)
print('state:', result.task_state)
print('error_msg:', result.error_msg)
task_config = result.task_config
print('task info:')
print('url:', task_config.url)
print('object_name:', task_config.object_name)
print('host:', task_config.host)
print('content_md5:', task_config.content_md5)
print('callback:', task_config.callback)
print('ignoreSameKey:', task_config.ignore_same_key)
62 changes: 49 additions & 13 deletions oss2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ class Bucket(_Base):
REQUESTPAYMENT = 'requestPayment'
QOS_INFO = 'qosInfo'
USER_QOS = 'qos'
ASYNC_FETCH = 'asyncFetch'
SEQUENTIAL = 'sequential'

def __init__(self, auth, endpoint, bucket_name,
Expand Down Expand Up @@ -2070,42 +2071,41 @@ def put_bucket_versioning(self, config, headers=None):
:return: :class:`RequestResult <oss2.models.RequestResult>`
"""
logger.debug("Start to put object versioning, bucket: {0}".format(
self.bucket_name))
logger.debug("Start to put object versioning, bucket: {0}".format(self.bucket_name))
data = self.__convert_data(BucketVersioningConfig, xml_utils.to_put_bucket_versioning, config)

headers = http.CaseInsensitiveDict(headers)
headers['Content-MD5'] = utils.content_md5(data)

resp = self.__do_bucket('PUT', data=data, params={Bucket.VERSIONING: ''}, headers=headers)
logger.debug("Put bucket versiong done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return RequestResult(resp)

def get_bucket_versioning(self):
"""
:return: :class:`GetBucketVersioningResult<oss2.models.GetBucketVersioningResult>`
"""
logger.debug("Start to get bucket versioning, bucket: {0}".format(
self.bucket_name))

logger.debug("Start to get bucket versioning, bucket: {0}".format(self.bucket_name))
resp = self.__do_bucket('GET', params={Bucket.VERSIONING: ''})
logger.debug("Get bucket versiong done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return self._parse_result(resp, xml_utils.parse_get_bucket_versioning, GetBucketVersioningResult)

def put_bucket_policy(self, policy):
"""设置bucket policy, 具体policy书写规则请参考官方文档
:param str policy:
"""

logger.debug("Start to put bucket policy, bucket: {0}, policy: {1}".format(
self.bucket_name, policy))
"""设置bucket授权策略, 具体policy书写规则请参考官方文档
:param str policy: 授权策略
"""
logger.debug("Start to put bucket policy, bucket: {0}, policy: {1}".format(self.bucket_name, policy))
resp = self.__do_bucket('PUT', data=policy, params={Bucket.POLICY: ''}, headers={'Content-MD5': utils.content_md5(policy)})
logger.debug("Put bucket policy done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return RequestResult(resp)

def get_bucket_policy(self):
"""
"""获取bucket授权策略
:return: :class:`GetBucketPolicyResult <oss2.models.GetBucketPolicyResult>`
"""

Expand All @@ -2115,7 +2115,7 @@ def get_bucket_policy(self):
return GetBucketPolicyResult(resp)

def delete_bucket_policy(self):
"""
"""删除bucket授权策略
:return: :class:`RequestResult <oss2.models.RequestResult>`
"""
logger.debug("Start to delete bucket policy, bucket: {0}".format(self.bucket_name))
Expand Down Expand Up @@ -2158,6 +2158,7 @@ def put_bucket_qos_info(self, bucket_qos_info):
headers = http.CaseInsensitiveDict()
headers['Content-MD5'] = utils.content_md5(data)
resp = self.__do_bucket('PUT', data=data, params={Bucket.QOS_INFO: ''}, headers=headers)
logger.debug("Get bucket qos info done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return RequestResult(resp)

Expand Down Expand Up @@ -2188,18 +2189,53 @@ def set_bucket_storage_capacity(self, user_qos):
:param user_qos :class:`BucketUserQos <oss2.models.BucketUserQos>`
"""
logger.debug("Start to set bucket storage capacity: {0}".format(self.bucket_name))
data = xml_utils.to_put_bucket_user_qos(user_qos)
resp = self.__do_bucket('PUT', data=data, params={Bucket.USER_QOS: ''})
logger.debug("Set bucket storage capacity done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return RequestResult(resp)

def get_bucket_storage_capacity(self):
"""获取bucket的容量信息。
:return: :class:`GetBucketUserQosResult <oss2.models.GetBucketUserQosResult>`
"""
logger.debug("Start to get bucket storage capacity, bucket:{0}".format(self.bucket_name))
resp = self._Bucket__do_bucket('GET', params={Bucket.USER_QOS: ''})
logger.debug("Get bucket storage capacity done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return self._parse_result(resp, xml_utils.parse_get_bucket_user_qos, GetBucketUserQosResult)

def put_async_fetch_task(self, task_config):
"""创建一个异步获取文件到bucket的任务。
:param task_config: 任务配置
:type task_config: class:`AsyncFetchTaskConfiguration <oss2.models.AsyncFetchTaskConfiguration>`
:return: :class:`PutAsyncFetchTaskResult <oss2.models.PutAsyncFetchTaskResult>`
"""
logger.debug("Start to put async fetch task, bucket:{0}".format(self.bucket_name))
data = xml_utils.to_put_async_fetch_task(task_config)
headers = http.CaseInsensitiveDict()
headers['Content-MD5'] = utils.content_md5(data)
resp = self._Bucket__do_bucket('POST', data=data, params={Bucket.ASYNC_FETCH: ''}, headers=headers)
logger.debug("Put async fetch task done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return self._parse_result(resp, xml_utils.parse_put_async_fetch_task_result, PutAsyncFetchTaskResult)

def get_async_fetch_task(self, task_id):
"""获取一个异步获取文件到bucket的任务信息。
:param str task_id: 任务id
:return: :class:`GetAsyncFetchTaskResult <oss2.models.GetAsyncFetchTaskResult>`
"""
logger.debug("Start to get async fetch task, bucket:{0}, task_id:{1}".format(self.bucket_name, task_id))
resp = self._Bucket__do_bucket('GET', headers={OSS_TASK_ID: task_id}, params={Bucket.ASYNC_FETCH: ''})
logger.debug("Put async fetch task done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))

return self._parse_result(resp, xml_utils.parse_get_async_fetch_task_result, GetAsyncFetchTaskResult)

def _get_bucket_config(self, config):
"""获得Bucket某项配置,具体哪种配置由 `config` 指定。该接口直接返回 `RequestResult` 对象。
通过read()接口可以获得XML字符串。不建议使用。
Expand Down
2 changes: 1 addition & 1 deletion oss2/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class Auth(AuthBase):
'live', 'comp', 'status', 'vod', 'startTime', 'endTime', 'x-oss-process',
'symlink', 'callback', 'callback-var', 'tagging', 'encryption', 'versions',
'versioning', 'versionId', 'policy', 'requestPayment', 'x-oss-traffic-limit', 'qosInfo',
'x-oss-request-payer', 'sequential']
'x-oss-request-payer', 'sequential', 'asyncFetch']
)

def _sign_request(self, req, bucket_name, key):
Expand Down
3 changes: 3 additions & 0 deletions oss2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ class SignatureDoesNotMatch(ServerError):
status = 403
code = 'SignatureDoesNotMatch'

class ObjectAlreadyExists(ServerError):
status = 400
code = 'ObjectAlreadyExists'

def make_exception(resp):
status = resp.status
Expand Down
2 changes: 2 additions & 0 deletions oss2/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

OSS_TRAFFIC_LIMIT = 'x-oss-traffic-limit'

OSS_TASK_ID = 'x-oss-task-id'

class RequestHeader(dict):
def __init__(self, *arg, **kw):
super(RequestHeader, self).__init__(*arg, **kw)
Expand Down
78 changes: 78 additions & 0 deletions oss2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1514,3 +1514,81 @@ class GetBucketUserQosResult(RequestResult, BucketUserQos):
def __init__(self, resp):
RequestResult.__init__(self, resp)
BucketUserQos.__init__(self)


ASYNC_FETCH_TASK_STATE_RUNNING = 'Running'
ASYNC_FETCH_TASK_STATE_RETRY = 'Retry'
ASYNC_FETCH_TASK_STATE_FETCH_SUCCESS_CALLBACK_FAILED = 'FetchSuccessCallbackFailed'
ASYNC_FETCH_TASK_STATE_FAILED= 'Failed'
ASYNC_FETCH_TASK_STATE_SUCCESS = 'Success'

class AsyncFetchTaskConfiguration(object):
"""异步获取文件到bucket到任务配置项
:param url: 源文件url
:type url: str
:param object_name: 文件的名称。
:type task_state: str
:param host: 文件所在服务器的host,如果不指定则会根据url解析填充。
:type host: str
:param content_md5: 指定校验源文件的md5
:type content_md5: str
:param callback: 指定fetch成功知乎回调给用户的引用服务器,如果不指定则不回调。
callback格式与OSS上传回调的请求头callback一致,详情见官网。
:type callback: str
:param ignore_same_key: 默认为True表示如果文件已存在则忽略本次任务,api调用将会报错。如果为False,则会覆盖已存在的object。
:type ignore_same_key: bool
"""
def __init__(self,
url,
object_name,
host = None,
content_md5 = None,
callback = None,
ignore_same_key = None):

self.url = url
self.object_name = object_name
self.host = host
self.content_md5 = content_md5
self.callback = callback
self.ignore_same_key = ignore_same_key

class PutAsyncFetchTaskResult(RequestResult):
def __init__(self, resp, task_id=None):
RequestResult.__init__(self, resp)
self.task_id = task_id

class GetAsyncFetchTaskResult(RequestResult):
"""获取异步获取文件到bucket的任务的返回结果
:param task_id: 任务id
:type task_id: str
:param task_state: 取值范围:oss2.models.ASYNC_FETCH_TASK_STATE_RUNNING, oss2.models.ASYNC_FETCH_TASK_STATE_RETRY,
oss2.models.ASYNC_FETCH_TASK_STATE_FETCH_SUCCESS_CALLBACK_FAILED, oss2.models.ASYNC_FETCH_TASK_STATE_FAILED,
oss2.models.ASYNC_FETCH_TASK_STATE_SUCCESS。
:type task_state: str
:param error_msg: 错误信息
:type error_msg: str
:param task_config: 任务配置信息
:type task_config: class:`AsyncFetchTaskConfiguration <oss2.models.AsyncFetchTaskConfiguration>`
"""
def __init__(self, resp,
task_id=None,
task_state=None,
error_msg=None,
task_config=None):

RequestResult.__init__(self, resp)
self.task_id = task_id
self.task_state = task_state
self.error_msg = error_msg
self.task_config = task_config
48 changes: 47 additions & 1 deletion oss2/xml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
REDIRECT_TYPE_INTERNAL,
REDIRECT_TYPE_ALICDN,
NoncurrentVersionStorageTransition,
NoncurrentVersionExpiration)
NoncurrentVersionExpiration,
AsyncFetchTaskConfiguration)

from .select_params import (SelectJsonTypes, SelectParameters)

Expand Down Expand Up @@ -1288,3 +1289,48 @@ def to_put_bucket_user_qos(user_qos):
_add_text_child(root, 'StorageCapacity', str(user_qos.storage_capacity))

return _node_to_string(root)


def to_put_async_fetch_task(task_config):
root = ElementTree.Element('AsyncFetchTaskConfiguration')

_add_text_child(root, 'Url', task_config.url)
_add_text_child(root, 'Object', task_config.object_name)

if task_config.host is not None:
_add_text_child(root, 'Host', task_config.host)
if task_config.content_md5 is not None:
_add_text_child(root, 'ContentMD5', task_config.content_md5)
if task_config.callback is not None:
_add_text_child(root, 'Callback', task_config.callback)
if task_config.ignore_same_key is not None:
_add_text_child(root, 'IgnoreSameKey', str(task_config.ignore_same_key).lower())

return _node_to_string(root)

def parse_put_async_fetch_task_result(result, body):
root = ElementTree.fromstring(body)

result.task_id = _find_tag(root, 'TaskId')

return result

def _parse_async_fetch_task_configuration(task_info_node):
url = _find_tag(task_info_node, 'Url')
object_name = _find_tag(task_info_node, 'Object')
host = _find_tag(task_info_node, 'Host')
content_md5 = _find_tag(task_info_node, 'ContentMD5')
callback = _find_tag(task_info_node, 'Callback')
ignore_same_key = _find_bool(task_info_node, 'IgnoreSameKey')

return AsyncFetchTaskConfiguration(url, object_name, host, content_md5, callback, ignore_same_key)

def parse_get_async_fetch_task_result(result, body):
root = ElementTree.fromstring(body)

result.task_id = _find_tag(root, 'TaskId')
result.task_state = _find_tag(root, 'State')
result.error_msg = _find_tag(root, 'ErrorMsg')
result.task_config = _parse_async_fetch_task_configuration(root.find('TaskInfo'))

return result

0 comments on commit 12a82b5

Please sign in to comment.