Skip to content

Commit

Permalink
1, Support sequential upload mode.
Browse files Browse the repository at this point in the history
2, Support signed url with request payment.
  • Loading branch information
liyanzhang505 authored and huiguangjun committed Dec 10, 2019
1 parent 946e64f commit e40a252
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 11 deletions.
15 changes: 11 additions & 4 deletions oss2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ class Bucket(_Base):
REQUESTPAYMENT = 'requestPayment'
QOS_INFO = 'qosInfo'
USER_QOS = 'qos'
SEQUENTIAL = 'sequential'

def __init__(self, auth, endpoint, bucket_name,
is_cname=False,
Expand Down Expand Up @@ -1207,7 +1208,7 @@ def delete_object_versions(self, keylist_versions, headers=None):
logger.debug("Delete object versions done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return self._parse_result(resp, xml_utils.parse_batch_delete_objects, BatchDeleteObjectsResult)

def init_multipart_upload(self, key, headers=None):
def init_multipart_upload(self, key, headers=None, params=None):
"""初始化分片上传。
返回值中的 `upload_id` 以及Bucket名和Object名三元组唯一对应了此次分片上传事件。
Expand All @@ -1221,9 +1222,15 @@ def init_multipart_upload(self, key, headers=None):
"""
headers = utils.set_content_type(http.CaseInsensitiveDict(headers), key)

logger.debug("Start to init multipart upload, bucket: {0}, keys: {1}, headers: {2}".format(
self.bucket_name, to_string(key), headers))
resp = self.__do_object('POST', key, params={'uploads': ''}, headers=headers)
if params is None:
tmp_params = dict()
else:
tmp_params = params.copy()

tmp_params['uploads'] = ''
logger.debug("Start to init multipart upload, bucket: {0}, keys: {1}, headers: {2}, params: {3}".format(
self.bucket_name, to_string(key), headers, tmp_params))
resp = self.__do_object('POST', key, params=tmp_params, headers=headers)
logger.debug("Init multipart upload done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return self._parse_result(resp, xml_utils.parse_init_multipart_upload, InitMultipartUploadResult)

Expand Down
3 changes: 2 additions & 1 deletion oss2/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class Auth(AuthBase):
'restore', 'qos', 'referer', 'stat', 'bucketInfo', 'append', 'position', 'security-token',
'live', 'comp', 'status', 'vod', 'startTime', 'endTime', 'x-oss-process',
'symlink', 'callback', 'callback-var', 'tagging', 'encryption', 'versions',
'versioning', 'versionId', 'policy', 'requestPayment', 'x-oss-traffic-limit', 'qosInfo']
'versioning', 'versionId', 'policy', 'requestPayment', 'x-oss-traffic-limit', 'qosInfo',
'x-oss-request-payer', 'sequential']
)

def _sign_request(self, req, bucket_name, key):
Expand Down
44 changes: 40 additions & 4 deletions oss2/resumable.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def resumable_upload(bucket, key, filename,
multipart_threshold=None,
part_size=None,
progress_callback=None,
num_threads=None):
num_threads=None,
params=None):
"""断点上传本地文件。
实现中采用分片上传方式上传本地文件,缺省的并发数是 `oss2.defaults.multipart_num_threads` ,并且在
Expand Down Expand Up @@ -67,6 +68,11 @@ def resumable_upload(bucket, key, filename,
:param part_size: 指定分片上传的每个分片的大小。如不指定,则自动计算。
:param progress_callback: 上传进度回调函数。参见 :ref:`progress_callback` 。
:param num_threads: 并发上传的线程数,如不指定则使用 `oss2.defaults.multipart_num_threads` 。
:param params: HTTP请求参数
# 只有'sequential'这个参数才会被传递到外部函数init_multipart_upload中。
# 其他参数视为无效参数不会往外部函数传递。
:type params: dict
"""
logger.debug("Start to resumable upload, bucket: {0}, key: {1}, filename: {2}, headers: {3}, "
"multipart_threshold: {4}, part_size: {5}, num_threads: {6}".format(bucket.bucket_name, to_string(key),
Expand All @@ -81,7 +87,8 @@ def resumable_upload(bucket, key, filename,
part_size=part_size,
headers=headers,
progress_callback=progress_callback,
num_threads=num_threads)
num_threads=num_threads,
params=params)
result = uploader.upload()
else:
with open(to_unicode(filename), 'rb') as f:
Expand Down Expand Up @@ -238,6 +245,31 @@ def _populate_valid_headers(headers=None, valid_keys=None):

return valid_headers

def _populate_valid_params(params=None, valid_keys=None):
"""构建只包含有效keys的params
:param params: 需要过滤的params
:type params: dict
:param valid_keys: 有效的关键key列表
:type valid_keys: list
:return: 只包含有效keys的params
"""
if params is None or valid_keys is None:
return None

valid_params = dict()

for key in valid_keys:
if params.get(key) is not None:
valid_params[key] = params[key]

if len(valid_params) == 0:
valid_params = None

return valid_params

class _ResumableOperation(object):
def __init__(self, bucket, key, filename, size, store,
progress_callback=None, versionid=None):
Expand Down Expand Up @@ -485,7 +517,8 @@ def __init__(self, bucket, key, filename, size,
headers=None,
part_size=None,
progress_callback=None,
num_threads=None):
num_threads=None,
params=None):
super(_ResumableUploader, self).__init__(bucket, key, filename, size,
store or ResumableStore(),
progress_callback=progress_callback)
Expand All @@ -500,6 +533,8 @@ def __init__(self, bucket, key, filename, size,

self.__upload_id = None

self.__params = params

# protect below fields
self.__lock = threading.Lock()
self.__record = None
Expand Down Expand Up @@ -583,7 +618,8 @@ def __load_record(self):
part_size = determine_part_size(self.size, self.__part_size)
logger.debug("Upload File size: {0}, User-specify part_size: {1}, Calculated part_size: {2}".format(
self.size, self.__part_size, part_size))
upload_id = self.bucket.init_multipart_upload(self.key, headers=self.__headers).upload_id
params = _populate_valid_params(self.__params, [Bucket.SEQUENTIAL])
upload_id = self.bucket.init_multipart_upload(self.key, headers=self.__headers, params=params).upload_id
record = {'upload_id': upload_id, 'mtime': self.__mtime, 'size': self.size, 'parts': [],
'abspath': self._abspath, 'bucket': self.bucket.bucket_name, 'key': self.key,
'part_size': part_size}
Expand Down
35 changes: 34 additions & 1 deletion tests/test_multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,39 @@ def test_multipart_with_object_tagging(self):
self.assertEqual('中文', result.tag_set.tagging_rule[' + '])

result = self.bucket.delete_object_tagging(key)


def test_multipart_sequential(self):
endpoint = "http://oss-cn-shanghai.aliyuncs.com"
auth = oss2.Auth(OSS_ID, OSS_SECRET)
bucket_name = OSS_BUCKET + "-test-multipart-sequential"
bucket = oss2.Bucket(auth, endpoint, bucket_name)
bucket.create_bucket()

key = self.random_key()
content = random_bytes(128 * 1024)

parts = []
upload_id = bucket.init_multipart_upload(key).upload_id
result = bucket.upload_part(key, upload_id, 1, content)
parts.append(oss2.models.PartInfo(1, result.etag, size=len(content), part_crc=result.crc))
bucket.complete_multipart_upload(key, upload_id, parts)

result = bucket.get_object(key)
self.assertIsNone(result.resp.headers.get('Content-MD5'))

parts = []
params = {'sequential':''}
upload_id = bucket.init_multipart_upload(key, params=params).upload_id
result = bucket.upload_part(key, upload_id, 1, content)
parts.append(oss2.models.PartInfo(1, result.etag, size=len(content), part_crc=result.crc))
bucket.complete_multipart_upload(key, upload_id, parts)

result = bucket.get_object(key)
self.assertIsNotNone(result.resp.headers.get('Content-MD5'))

bucket.delete_object(key)
bucket.delete_bucket()


if __name__ == '__main__':
unittest.main()
26 changes: 25 additions & 1 deletion tests/test_object_request_payment.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,31 @@ def test_part_iterator(self):
up_iter = oss2.PartIterator(self.payer_bucket, key, upload_id, headers=headers)
for up in up_iter:
pass


def test_put_object_with_signed_url(self):
key = 'request-payment-test-put-object-signed-url'
file_name = self._prepare_temp_file_with_size(1024)

params = dict()
params[OSS_REQUEST_PAYER] = "requester"
url = self.payer_bucket.sign_url('PUT', key, 60, params=params)
self.payer_bucket.put_object_with_url_from_file(url, file_name)

def test_get_object_with_signed_url(self):
key = 'request-payment-test-get-object-signed-url'
content = b'a' * 1024
file_name = key + '.txt'

result = self.bucket.put_object(key, content);
self.assertEqual(result.status, 200)

params = dict()
params[OSS_REQUEST_PAYER] = "requester"
url = self.payer_bucket.sign_url('GET', key, 60, params=params)
result = self.payer_bucket.get_object_with_url_to_file(url, file_name)

os.remove(file_name)
self.bucket.delete_object(key)

if __name__ == '__main__':
unittest.main()
22 changes: 22 additions & 0 deletions tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,28 @@ def test_upload_large_with_tagging(self):
self.assertEqual(0, result.tag_set.len())
self.bucket.delete_object(key)

def test_upload_sequenial(self):
endpoint = "http://oss-cn-shanghai.aliyuncs.com"
auth = oss2.Auth(OSS_ID, OSS_SECRET)
bucket_name = OSS_BUCKET + "-test-upload-sequential"
bucket = oss2.Bucket(auth, endpoint, bucket_name)
bucket.create_bucket()

key = random_string(16)
content = random_bytes(5 * 100 * 1024)
pathname = self._prepare_temp_file(content)

oss2.resumable_upload(bucket, key, pathname, multipart_threshold=200 * 1024, part_size=None)
result = bucket.get_object(key)
self.assertIsNone(result.resp.headers.get('Content-MD5'))

params={'sequential' : ''}
oss2.resumable_upload(bucket, key, pathname, multipart_threshold=200 * 1024, part_size=None, params=params)
result = bucket.get_object(key)
self.assertIsNotNone(result.resp.headers.get('Content-MD5'))

bucket.delete_object(key)
bucket.delete_bucket()

if __name__ == '__main__':
unittest.main()

0 comments on commit e40a252

Please sign in to comment.