/
api.py
2978 lines (2284 loc) · 146 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
"""
文件上传方法中的data参数
------------------------
诸如 :func:`put_object <Bucket.put_object>` 这样的上传接口都会有 `data` 参数用于接收用户数据。`data` 可以是下述类型
- unicode类型(对于Python3则是str类型):内部会自动转换为UTF-8的bytes
- bytes类型:不做任何转换
- file-like object:对于可以seek和tell的file object,从当前位置读取直到结束。其他类型,请确保当前位置是文件开始。
- 可迭代类型:对于无法探知长度的数据,要求一定是可迭代的。此时会通过Chunked Encoding传输。
Bucket配置修改方法中的input参数
-----------------------------
诸如 :func:`put_bucket_cors <Bucket.put_bucket_cors>` 这样的Bucket配置修改接口都会有 `input` 参数接收用户提供的配置数据。
`input` 可以是下述类型
- Bucket配置信息相关的类,如 `BucketCors`
- unicode类型(对于Python3则是str类型)
- 经过utf-8编码的bytes类型
- file-like object
- 可迭代类型,会通过Chunked Encoding传输
也就是说 `input` 参数可以比 `data` 参数多接受第一种类型的输入。
返回值
------
:class:`Service` 和 :class:`Bucket` 类的大多数方法都是返回 :class:`RequestResult <oss2.models.RequestResult>`
及其子类。`RequestResult` 包含了HTTP响应的状态码、头部以及OSS Request ID,而它的子类则包含用户真正想要的结果。例如,
`ListBucketsResult.buckets` 就是返回的Bucket信息列表;`GetObjectResult` 则是一个file-like object,可以调用 `read()` 来获取响应的
HTTP包体。
:class:`CryptoBucket`:
加密接口
-------
CryptoBucket仅提供上传下载加密数据的接口,诸如`get_object` 、 `put_object` ,返回值与Bucket相应接口一致。
异常
----
一般来说Python SDK可能会抛出三种类型的异常,这些异常都继承于 :class:`OssError <oss2.exceptions.OssError>` :
- :class:`ClientError <oss2.exceptions.ClientError>` :由于用户参数错误而引发的异常;
- :class:`ServerError <oss2.exceptions.ServerError>` 及其子类:OSS服务器返回非成功的状态码,如4xx或5xx;
- :class:`RequestError <oss2.exceptions.RequestError>` :底层requests库抛出的异常,如DNS解析错误,超时等;
当然,`Bucket.put_object_from_file` 和 `Bucket.get_object_to_file` 这类函数还会抛出文件相关的异常。
.. _byte_range:
指定下载范围
------------
诸如 :func:`get_object <Bucket.get_object>` 以及 :func:`upload_part_copy <Bucket.upload_part_copy>` 这样的函数,可以接受
`byte_range` 参数,表明读取数据的范围。该参数是一个二元tuple:(start, last)。这些接口会把它转换为Range头部的值,如:
- byte_range 为 (0, 99) 转换为 'bytes=0-99',表示读取前100个字节
- byte_range 为 (None, 99) 转换为 'bytes=-99',表示读取最后99个字节
- byte_range 为 (100, None) 转换为 'bytes=100-',表示读取第101个字节到文件结尾的部分(包含第101个字节)
分页罗列
-------
罗列各种资源的接口,如 :func:`list_buckets <Service.list_buckets>` 、 :func:`list_objects <Bucket.list_objects>` 都支持
分页查询。通过设定分页标记(如:`marker` 、 `key_marker` )的方式可以指定查询某一页。首次调用将分页标记设为空(缺省值,可以不设),
后续的调用使用返回值中的 `next_marker` 、 `next_key_marker` 等。每次调用后检查返回值中的 `is_truncated` ,其值为 `False` 说明
已经到了最后一页。
.. _line_range:
指定查询CSV文件范围
------------
诸如 :func:`select_object <Bucket.select_object>` 以及 :func:`select_object_to_file <Bucket.select_object_to_file>` 这样的函数的select_csv_params参数,可以接受
`LineRange` 参数,表明读取CSV数据的范围。该参数是一个二元tuple:(start, last):
- LineRange 为 (0, 99) 表示读取前100行
- LineRange 为 (None, 99) 表示读取最后99行
- LineRange 为 (100, None) 表示读取第101行到文件结尾的部分(包含第101行)
.. _split_range:
指定查询CSV文件范围
------------
split可以认为是切分好的大小大致相等的csv行簇。每个Split大小大致相等,这样以便更好的做到负载均衡。
诸如 :func:`select_object <Bucket.select_object>` 以及 :func:`select_object_to_file <Bucket.select_object_to_file>` 这样的函数的select_csv_params参数,可以接受
`SplitRange` 参数,表明读取CSV数据的范围。该参数是一个二元tuple:(start, last):
- SplitRange 为 (0, 9) 表示读取前10个Split
- SplitRange 为 (None, 9) 表示读取最后9个split
- SplitRange 为 (10, None) 表示读取第11个split到文件结尾的部分(包含第11个Split)
分页查询
-------
和create_csv_object_meta配合使用,有两种方法:
- 方法1:先获取文件总的行数(create_csv_object_meta返回),然后把文件以line_range分成若干部分并行查询
- 方法2:先获取文件总的Split数(create_csv_object_meta返回), 然后把文件分成若干个请求,每个请求含有大致相等的Split
.. _progress_callback:
上传下载进度
-----------
上传下载接口,诸如 `get_object` 、 `put_object` 、`resumable_upload`,都支持进度回调函数,可以用它实现进度条等功能。
`progress_callback` 的函数原型如下 ::
def progress_callback(bytes_consumed, total_bytes):
'''进度回调函数。
:param int bytes_consumed: 已经消费的字节数。对于上传,就是已经上传的量;对于下载,就是已经下载的量。
:param int total_bytes: 总长度。
'''
其中 `total_bytes` 对于上传和下载有不同的含义:
- 上传:当输入是bytes或可以seek/tell的文件对象,那么它的值就是总的字节数;否则,其值为None
- 下载:当返回的HTTP相应中有Content-Length头部,那么它的值就是Content-Length的值;否则,其值为None
.. _unix_time:
Unix Time
---------
OSS Python SDK会把从服务器获得时间戳都转换为自1970年1月1日UTC零点以来的秒数,即Unix Time。
参见 `Unix Time <https://en.wikipedia.org/wiki/Unix_time>`_
OSS中常用的时间格式有
- HTTP Date格式,形如 `Sat, 05 Dec 2015 11:04:39 GMT` 这样的GMT时间。
用在If-Modified-Since、Last-Modified这些HTTP请求、响应头里。
- ISO8601格式,形如 `2015-12-05T00:00:00.000Z`。
用在生命周期管理配置、列举Bucket结果中的创建时间、列举文件结果中的最后修改时间等处。
`http_date` 函数把Unix Time转换为HTTP Date;而 `http_to_unixtime` 则做相反的转换。如 ::
>>> import oss2, time
>>> unix_time = int(time.time()) # 当前UNIX Time,设其值为 1449313829
>>> date_str = oss2.http_date(unix_time) # 得到 'Sat, 05 Dec 2015 11:10:29 GMT'
>>> oss2.http_to_unixtime(date_str) # 得到 1449313829
.. note::
生成HTTP协议所需的日期(即HTTP Date)时,请使用 `http_date` , 不要使用 `strftime` 这样的函数。因为后者是和locale相关的。
比如,`strftime` 结果中可能会出现中文,而这样的格式,OSS服务器是不能识别的。
`iso8601_to_unixtime` 把ISO8601格式转换为Unix Time;`date_to_iso8601` 和 `iso8601_to_date` 则在ISO8601格式的字符串和
datetime.date之间相互转换。如 ::
>>> import oss2
>>> d = oss2.iso8601_to_date('2015-12-05T00:00:00.000Z') # 得到 datetime.date(2015, 12, 5)
>>> date_str = oss2.date_to_iso8601(d) # 得到 '2015-12-05T00:00:00.000Z'
>>> oss2.iso8601_to_unixtime(date_str) # 得到 1449273600
.. _select_params:
指定OSS Select的文件格式。
对于Csv文件,支持如下Keys:
>>> CsvHeaderInfo: None|Use|Ignore #None表示没有CSV Schema头,Use表示启用CSV Schema头,可以在Select语句中使用Name,Ignore表示有CSV Schema头,但忽略它(Select语句中不可以使用Name)
默认值是None
>>> CommentCharacter: Comment字符,默认值是#,不支持多个字符
>>> RecordDelimiter: 行分隔符,默认是\n,最多支持两个字符分隔符(比如:\r\n)
>>> FieldDelimiter: 列分隔符,默认是逗号(,), 不支持多个字符
>>> QuoteCharacter: 列Quote字符,默认是双引号("),不支持多个字符。注意转义符合Quote字符相同。
>>> LineRange: 指定查询CSV文件的行范围,参见 `line_range`。
>>> SplitRange: 指定查询CSV文件的Split范围,参见 `split_range`.
注意LineRange和SplitRange两种不能同时指定。若同时指定LineRange会被忽略。
>>> CompressionType: 文件的压缩格式,默认值是None, 支持GZIP。
>>> OutputRawData: 指定是响应Body返回Raw数据,默认值是False.
>>> SkipPartialDataRecord: 当CSV行数据不完整时(select语句中出现的列在该行为空),是否跳过该行。默认是False。
>>> OutputHeader:是否输出CSV Header,默认是False.
>>> EnablePayloadCrc:是否启用对Payload的CRC校验,默认是False. 该选项不能和OutputRawData:True混用。
>>> MaxSkippedRecordsAllowed: 允许跳过的最大行数。默认值是0表示一旦有一行跳过就报错。当下列两种情况下该行CSV被跳过:1)当SkipPartialDataRecord为True时且该行不完整时 2)当该行的数据类型和SQL不匹配时
对于Json 文件, 支持如下Keys:
>>> Json_Type: DOCUMENT | LINES . DOCUMENT就是指一般的Json文件,LINES是指每一行是一个合法的JSON对象,文件由多行Json对象组成,整个文件本身不是合法的Json对象。
>>> LineRange: 指定查询JSON LINE文件的行范围,参见 `line_range`。注意该参数仅支持LINES类型
>>> SplitRange: 指定查询JSON LINE文件的Split范围,参见 `split_range`.注意该参数仅支持LINES类型
>>> CompressionType: 文件的压缩格式,默认值是None, 支持GZIP。
>>> OutputRawData: 指定是响应Body返回Raw数据,默认值是False.
>>> SkipPartialDataRecord: 当一条JSON记录数据不完整时(select语句中出现的Key在该对象为空),是否跳过该Json记录。默认是False。
>>> EnablePayloadCrc:是否启用对Payload的CRC校验,默认是False. 该选项不能和OutputRawData:True混用。
>>> MaxSkippedRecordsAllowed: 允许跳过的最大Json记录数。默认值是0表示一旦有一条Json记录跳过就报错。当下列两种情况下该JSON被跳过:1)当SkipPartialDataRecord为True时且该条Json记录不完整时 2)当该记录的数据类型和SQL不匹配时
>>> ParseJsonNumberAsString: 将Json文件中的数字解析成字符串。使用场景是当Json文件中的浮点数精度较高时,系统默认的浮点数精度无法达到要求,当解析成字符串时将完整保留原始数据精度,在Sql中使用Cast可以将字符串无精度损失地转成decimal.
>>> AllowQuotedRecordDelimiter: 允许CSV中的列包含转义过的换行符。默认为true。当值为False时,select API可以用Range:bytes来设置选取目标对象内容的范围
‘
.. _select_meta_params:
create_select_object_meta参数集合,支持如下Keys:
- RecordDelimiter: CSV换行符,最多支持两个字符
- FieldDelimiter: CSV列分隔符,最多支持一个字符
- QuoteCharacter: CSV转移Quote符,最多支持一个字符
- OverwriteIfExists: true|false. true表示重新获得csv meta,并覆盖原有的meta。一般情况下不需要使用
"""
import logging
from . import xml_utils
from . import http
from . import utils
from . import exceptions
from . import defaults
from . import models
from . import select_params
from .models import *
from .compat import urlquote, urlparse, to_unicode, to_string
from .headers import *
from .select_params import *
import time
import shutil
logger = logging.getLogger(__name__)
class _Base(object):
def __init__(self, auth, endpoint, is_cname, session, connect_timeout,
app_name='', enable_crc=True, proxies=None, region=None, cloudbox_id= None, is_path_style=False, is_verify_object_strict=True):
self.auth = auth
self.endpoint = _normalize_endpoint(endpoint.strip())
if utils.is_valid_endpoint(self.endpoint) is not True:
raise ClientError('The endpoint you has specified is not valid, endpoint: {0}'.format(endpoint))
self.session = session or http.Session()
self.timeout = defaults.get(connect_timeout, defaults.connect_timeout)
self.app_name = app_name
self.enable_crc = enable_crc
self.proxies = proxies
self.region = region
self.product = 'oss'
self.cloudbox_id = cloudbox_id
if self.cloudbox_id is not None:
self.product = 'oss-cloudbox'
self._make_url = _UrlMaker(self.endpoint, is_cname, is_path_style)
self.is_verify_object_strict = is_verify_object_strict
if hasattr(self.auth, 'auth_version') and self.auth.auth_version() != 'v1' :
self.is_verify_object_strict = False
def _do(self, method, bucket_name, key, **kwargs):
key = to_string(key)
req = http.Request(method, self._make_url(bucket_name, key),
app_name=self.app_name,
proxies=self.proxies,
region=self.region,
product=self.product,
cloudbox_id=self.cloudbox_id,
**kwargs)
self.auth._sign_request(req, bucket_name, key)
resp = self.session.do_request(req, timeout=self.timeout)
if resp.status // 100 != 2:
e = exceptions.make_exception(resp)
logger.info("Exception: {0}".format(e))
raise e
# Note that connections are only released back to the pool for reuse once all body data has been read;
# be sure to either set stream to False or read the content property of the Response object.
# For more details, please refer to http://docs.python-requests.org/en/master/user/advanced/#keep-alive.
content_length = models._hget(resp.headers, 'content-length', int)
if content_length is not None and content_length == 0:
resp.read()
return resp
def _do_url(self, method, sign_url, **kwargs):
req = http.Request(method, sign_url, app_name=self.app_name, proxies=self.proxies, **kwargs)
resp = self.session.do_request(req, timeout=self.timeout)
if resp.status // 100 != 2:
e = exceptions.make_exception(resp)
logger.info("Exception: {0}".format(e))
raise e
# Note that connections are only released back to the pool for reuse once all body data has been read;
# be sure to either set stream to False or read the content property of the Response object.
# For more details, please refer to http://docs.python-requests.org/en/master/user/advanced/#keep-alive.
content_length = models._hget(resp.headers, 'content-length', int)
if content_length is not None and content_length == 0:
resp.read()
return resp
@staticmethod
def _parse_result(resp, parse_func, klass):
result = klass(resp)
parse_func(result, resp.read())
return result
class Service(_Base):
"""用于Service操作的类,如罗列用户所有的Bucket。
用法 ::
>>> import oss2
>>> auth = oss2.Auth('your-access-key-id', 'your-access-key-secret')
>>> service = oss2.Service(auth, 'oss-cn-hangzhou.aliyuncs.com')
>>> service.list_buckets()
<oss2.models.ListBucketsResult object at 0x0299FAB0>
:param auth: 包含了用户认证信息的Auth对象
:type auth: oss2.Auth
:param str endpoint: 访问域名,如杭州区域的域名为oss-cn-hangzhou.aliyuncs.com
:param session: 会话。如果是None表示新开会话,非None则复用传入的会话
:type session: oss2.Session
:param float connect_timeout: 连接超时时间,以秒为单位。
:param str app_name: 应用名。该参数不为空,则在User Agent中加入其值。
注意到,最终这个字符串是要作为HTTP Header的值传输的,所以必须要遵循HTTP标准。
"""
QOS_INFO = 'qosInfo'
REGIONS = 'regions'
WRITE_GET_OBJECT_RESPONSE = 'x-oss-write-get-object-response'
def __init__(self, auth, endpoint,
session=None,
connect_timeout=None,
app_name='',
proxies=None,
region=None,
cloudbox_id=None,
is_path_style=False):
logger.debug("Init oss service, endpoint: {0}, connect_timeout: {1}, app_name: {2}, proxies: {3}".format(
endpoint, connect_timeout, app_name, proxies))
super(Service, self).__init__(auth, endpoint, False, session, connect_timeout,
app_name=app_name, proxies=proxies,
region=region, cloudbox_id=cloudbox_id, is_path_style=is_path_style)
def list_buckets(self, prefix='', marker='', max_keys=100, params=None, headers=None):
"""根据前缀罗列用户的Bucket。
:param str prefix: 只罗列Bucket名为该前缀的Bucket,空串表示罗列所有的Bucket
:param str marker: 分页标志。首次调用传空串,后续使用返回值中的next_marker
:param int max_keys: 每次调用最多返回的Bucket数目
:param dict params: list操作参数,传入'tag-key','tag-value'对结果进行过滤
:param headers: 用户指定的HTTP头部。可以指定Content-Type、Content-MD5、x-oss-meta-开头的头部等。可以是dict,建议是oss2.CaseInsensitiveDict
:return: 罗列的结果
:rtype: oss2.models.ListBucketsResult
"""
logger.debug("Start to list buckets, prefix: {0}, marker: {1}, max-keys: {2}".format(prefix, marker, max_keys))
listParam = {}
listParam['prefix'] = prefix
listParam['marker'] = marker
listParam['max-keys'] = str(max_keys)
headers = http.CaseInsensitiveDict(headers)
if params is not None:
listParam.update(params)
resp = self._do('GET', '', '', params=listParam, headers=headers)
logger.debug("List buckets done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return self._parse_result(resp, xml_utils.parse_list_buckets, ListBucketsResult)
def get_user_qos_info(self):
"""获取User的QoSInfo
:return: :class:`GetUserQosInfoResult <oss2.models.GetUserQosInfoResult>`
"""
logger.debug("Start to get user qos info.")
resp = self._do('GET', '', '', params={Service.QOS_INFO: ''})
logger.debug("get use qos, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return self._parse_result(resp, xml_utils.parse_get_qos_info, GetUserQosInfoResult)
def describe_regions(self, regions=''):
"""查询所有支持地域或者指定地域对应的Endpoint信息,包括外网Endpoint、内网Endpoint和传输加速Endpoint。
:param str regions : 地域。
:return: :class:`DescribeRegionsResult <oss2.models.DescribeRegionsResult>`
"""
logger.debug("Start to describe regions")
resp = self._do('GET', '', '', params={Service.REGIONS: regions})
logger.debug("Describe regions done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return self._parse_result(resp, xml_utils.parse_describe_regions, DescribeRegionsResult)
def write_get_object_response(self, route, token, fwd_status, data, headers=None):
"""write get object response.
:param route: fc return route
:param token: fc return token
:param fwd_status: fwd_status
:param data: 待上传的内容。
:type data: bytes,str或file-like object
:param headers: 用户指定的HTTP头部。可以指定Content-Type、Content-MD5、x-oss-meta-开头的头部等
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:return: :class:`RequestResult <oss2.models.RequestResult>`
"""
logger.debug("Start to write get object response, headers: {0}".format(headers))
headers = http.CaseInsensitiveDict(headers)
if route:
headers['x-oss-request-route'] = route
if token:
headers['x-oss-request-token'] = token
if fwd_status:
headers['x-oss-fwd-status'] = fwd_status
resp = self._do('POST', '', '', params={Service.WRITE_GET_OBJECT_RESPONSE: ''}, headers=headers, data=data)
logger.debug("write get object response done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return RequestResult(resp)
class Bucket(_Base):
"""用于Bucket和Object操作的类,诸如创建、删除Bucket,上传、下载Object等。
用法(假设Bucket属于杭州区域) ::
>>> import oss2
>>> auth = oss2.Auth('your-access-key-id', 'your-access-key-secret')
>>> bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'your-bucket')
>>> bucket.put_object('readme.txt', 'content of the object')
<oss2.models.PutObjectResult object at 0x029B9930>
:param auth: 包含了用户认证信息的Auth对象
:type auth: oss2.Auth
:param str endpoint: 访问域名或者CNAME
:param str bucket_name: Bucket名
:param bool is_cname: 如果endpoint是CNAME则设为True;反之,则为False。
:param session: 会话。如果是None表示新开会话,非None则复用传入的会话
:type session: oss2.Session
:param float connect_timeout: 连接超时时间,以秒为单位。
:param str app_name: 应用名。该参数不为空,则在User Agent中加入其值。
注意到,最终这个字符串是要作为HTTP Header的值传输的,所以必须要遵循HTTP标准。
:param bool is_verify_object_strict: 严格验证对象名称的标志。默认为True。
"""
ACL = 'acl'
CORS = 'cors'
LIFECYCLE = 'lifecycle'
LOCATION = 'location'
LOGGING = 'logging'
REFERER = 'referer'
WEBSITE = 'website'
LIVE = 'live'
COMP = 'comp'
STATUS = 'status'
VOD = 'vod'
SYMLINK = 'symlink'
STAT = 'stat'
BUCKET_INFO = 'bucketInfo'
PROCESS = 'x-oss-process'
TAGGING = 'tagging'
ENCRYPTION = 'encryption'
VERSIONS = 'versions'
VERSIONING = 'versioning'
VERSIONID = 'versionId'
RESTORE = 'restore'
OBJECTMETA = 'objectMeta'
POLICY = 'policy'
REQUESTPAYMENT = 'requestPayment'
QOS_INFO = 'qosInfo'
USER_QOS = 'qos'
ASYNC_FETCH = 'asyncFetch'
SEQUENTIAL = 'sequential'
INVENTORY = "inventory"
INVENTORY_CONFIG_ID = "inventoryId"
CONTINUATION_TOKEN = "continuation-token"
WORM = "worm"
WORM_ID = "wormId"
WORM_EXTEND = "wormExtend"
REPLICATION = "replication"
REPLICATION_LOCATION = 'replicationLocation'
REPLICATION_PROGRESS = 'replicationProgress'
TRANSFER_ACCELERATION = 'transferAcceleration'
CNAME = 'cname'
META_QUERY = 'metaQuery'
ACCESS_MONITOR = 'accessmonitor'
RESOURCE_GROUP = 'resourceGroup'
STYLE = 'style'
STYLE_NAME = 'styleName'
ASYNC_PROCESS = 'x-oss-async-process'
CALLBACK = 'callback'
def __init__(self, auth, endpoint, bucket_name,
is_cname=False,
session=None,
connect_timeout=None,
app_name='',
enable_crc=True,
proxies=None,
region=None,
cloudbox_id=None,
is_path_style=False,
is_verify_object_strict=True):
logger.debug("Init Bucket: {0}, endpoint: {1}, isCname: {2}, connect_timeout: {3}, app_name: {4}, enabled_crc: {5}, region: {6}"
", proxies: {6}".format(bucket_name, endpoint, is_cname, connect_timeout, app_name, enable_crc, proxies, region))
super(Bucket, self).__init__(auth, endpoint, is_cname, session, connect_timeout,
app_name=app_name, enable_crc=enable_crc, proxies=proxies,
region=region, cloudbox_id=cloudbox_id, is_path_style=is_path_style, is_verify_object_strict=is_verify_object_strict)
self.bucket_name = bucket_name.strip()
if utils.is_valid_bucket_name(self.bucket_name) is not True:
raise ClientError("The bucket_name is invalid, please check it.")
def sign_url(self, method, key, expires, headers=None, params=None, slash_safe=False):
"""生成签名URL。
常见的用法是生成加签的URL以供授信用户下载,如为log.jpg生成一个5分钟后过期的下载链接::
>>> bucket.sign_url('GET', 'log.jpg', 5 * 60)
r'http://your-bucket.oss-cn-hangzhou.aliyuncs.com/logo.jpg?OSSAccessKeyId=YourAccessKeyId\&Expires=1447178011&Signature=UJfeJgvcypWq6Q%2Bm3IJcSHbvSak%3D'
:param method: HTTP方法,如'GET'、'PUT'、'DELETE'等
:type method: str
:param key: 文件名
:param expires: 过期时间(单位:秒),链接在当前时间再过expires秒后过期
:param headers: 需要签名的HTTP头部,如名称以x-oss-meta-开头的头部(作为用户自定义元数据)、
Content-Type头部等。对于下载,不需要填。
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:param params: 需要签名的HTTP查询参数
:param slash_safe: 是否开启key名称中的‘/’转义保护,如果不开启'/'将会转义成%2F
:type slash_safe: bool
:return: 签名URL。
"""
if key is None or len(key.strip()) <= 0:
raise ClientError("The key is invalid, please check it.")
key = to_string(key)
if self.is_verify_object_strict and key.startswith('?'):
raise ClientError("The key cannot start with `?`, please check it.")
logger.debug(
"Start to sign_url, method: {0}, bucket: {1}, key: {2}, expires: {3}, headers: {4}, params: {5}, slash_safe: {6}".format(
method, self.bucket_name, to_string(key), expires, headers, params, slash_safe))
req = http.Request(method, self._make_url(self.bucket_name, key, slash_safe),
headers=headers,
params=params,
region=self.region,
product=self.product,
cloudbox_id=self.cloudbox_id)
return self.auth._sign_url(req, self.bucket_name, key, expires)
def sign_rtmp_url(self, channel_name, playlist_name, expires):
"""生成RTMP推流的签名URL。
常见的用法是生成加签的URL以供授信用户向OSS推RTMP流。
:param channel_name: 直播频道的名称
:param expires: 过期时间(单位:秒),链接在当前时间再过expires秒后过期
:param playlist_name: 播放列表名称,注意与创建live channel时一致
:param params: 需要签名的HTTP查询参数
:return: 签名URL。
"""
logger.debug("Sign RTMP url, bucket: {0}, channel_name: {1}, playlist_name: {2}, expires: {3}".format(
self.bucket_name, channel_name, playlist_name, expires))
url = self._make_url(self.bucket_name, 'live').replace('http://', 'rtmp://').replace(
'https://', 'rtmp://') + '/' + channel_name
params = {}
if playlist_name is not None and playlist_name != "":
params['playlistName'] = playlist_name
return self.auth._sign_rtmp_url(url, self.bucket_name, channel_name, expires, params)
def list_objects(self, prefix='', delimiter='', marker='', max_keys=100, headers=None):
"""根据前缀罗列Bucket里的文件。
:param str prefix: 只罗列文件名为该前缀的文件
:param str delimiter: 分隔符。可以用来模拟目录
:param str marker: 分页标志。首次调用传空串,后续使用返回值的next_marker
:param int max_keys: 最多返回文件的个数,文件和目录的和不能超过该值
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:return: :class:`ListObjectsResult <oss2.models.ListObjectsResult>`
"""
headers = http.CaseInsensitiveDict(headers)
logger.debug(
"Start to List objects, bucket: {0}, prefix: {1}, delimiter: {2}, marker: {3}, max-keys: {4}".format(
self.bucket_name, to_string(prefix), delimiter, to_string(marker), max_keys))
resp = self.__do_bucket('GET',
params={'prefix': prefix,
'delimiter': delimiter,
'marker': marker,
'max-keys': str(max_keys),
'encoding-type': 'url'},
headers=headers)
logger.debug("List objects done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return self._parse_result(resp, xml_utils.parse_list_objects, ListObjectsResult)
def list_objects_v2(self, prefix='', delimiter='', continuation_token='', start_after='', fetch_owner=False, encoding_type='url', max_keys=100, headers=None):
"""根据前缀罗列Bucket里的文件。
:param str prefix: 只罗列文件名为该前缀的文件
:param str delimiter: 分隔符。可以用来模拟目录
:param str continuation_token: 分页标志。首次调用传空串,后续使用返回值的next_continuation_token
:param str start_after: 起始文件名称,OSS会返回按照字典序排列start_after之后的文件。
:param bool fetch_owner: 是否获取文件的owner信息,默认不返回。
:param int max_keys: 最多返回文件的个数,文件和目录的和不能超过该值
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:return: :class:`ListObjectsV2Result <oss2.models.ListObjectsV2Result>`
"""
headers = http.CaseInsensitiveDict(headers)
logger.debug(
"Start to List objects, bucket: {0}, prefix: {1}, delimiter: {2}, continuation_token: {3}, "
"start-after: {4}, fetch-owner: {5}, encoding_type: {6}, max-keys: {7}".format(
self.bucket_name, to_string(prefix), delimiter, continuation_token, start_after, fetch_owner, encoding_type, max_keys))
resp = self.__do_bucket('GET',
params={'list-type': '2',
'prefix': prefix,
'delimiter': delimiter,
'continuation-token': continuation_token,
'start-after': start_after,
'fetch-owner': str(fetch_owner).lower(),
'max-keys': str(max_keys),
'encoding-type': encoding_type},
headers=headers)
logger.debug("List objects V2 done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return self._parse_result(resp, xml_utils.parse_list_objects_v2, ListObjectsV2Result)
def put_object(self, key, data,
headers=None,
progress_callback=None):
"""上传一个普通文件。
用法 ::
>>> bucket.put_object('readme.txt', 'content of readme.txt')
>>> with open(u'local_file.txt', 'rb') as f:
>>> bucket.put_object('remote_file.txt', f)
:param key: 上传到OSS的文件名
:param data: 待上传的内容。
:type data: bytes,str或file-like object
:param headers: 用户指定的HTTP头部。可以指定Content-Type、Content-MD5、x-oss-meta-开头的头部等
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:param progress_callback: 用户指定的进度回调函数。可以用来实现进度条等功能。参考 :ref:`progress_callback` 。
:return: :class:`PutObjectResult <oss2.models.PutObjectResult>`
"""
headers = utils.set_content_type(http.CaseInsensitiveDict(headers), key)
if progress_callback:
data = utils.make_progress_adapter(data, progress_callback)
if self.enable_crc:
data = utils.make_crc_adapter(data)
logger.debug("Start to put object, bucket: {0}, key: {1}, headers: {2}".format(self.bucket_name, to_string(key),
headers))
resp = self.__do_object('PUT', key, data=data, headers=headers)
logger.debug("Put object done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
result = PutObjectResult(resp)
if self.enable_crc and result.crc is not None:
utils.check_crc('put object', data.crc, result.crc, result.request_id)
return result
def put_object_from_file(self, key, filename,
headers=None,
progress_callback=None):
"""上传一个本地文件到OSS的普通文件。
:param str key: 上传到OSS的文件名
:param str filename: 本地文件名,需要有可读权限
:param headers: 用户指定的HTTP头部。可以指定Content-Type、Content-MD5、x-oss-meta-开头的头部等
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:return: :class:`PutObjectResult <oss2.models.PutObjectResult>`
"""
headers = utils.set_content_type(http.CaseInsensitiveDict(headers), filename)
logger.debug("Put object from file, bucket: {0}, key: {1}, file path: {2}".format(
self.bucket_name, to_string(key), filename))
with open(to_unicode(filename), 'rb') as f:
return self.put_object(key, f, headers=headers, progress_callback=progress_callback)
def put_object_with_url(self, sign_url, data, headers=None, progress_callback=None):
""" 使用加签的url上传对象
:param sign_url: 加签的url
:param data: 待上传的数据
:param headers: 用户指定的HTTP头部。可以指定Content-Type、Content-MD5、x-oss-meta-开头的头部等,必须和签名时保持一致
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:return:
"""
headers = http.CaseInsensitiveDict(headers)
if progress_callback:
data = utils.make_progress_adapter(data, progress_callback)
if self.enable_crc:
data = utils.make_crc_adapter(data)
logger.debug("Start to put object with signed url, bucket: {0}, sign_url: {1}, headers: {2}".format(
self.bucket_name, sign_url, headers))
resp = self._do_url('PUT', sign_url, data=data, headers=headers)
logger.debug("Put object with url done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
result = PutObjectResult(resp)
if self.enable_crc and result.crc is not None:
utils.check_crc('put object', data.crc, result.crc, result.request_id)
return result
def put_object_with_url_from_file(self, sign_url, filename,
headers=None,
progress_callback=None):
""" 使用加签的url上传本地文件到oss
:param sign_url: 加签的url
:param filename: 本地文件路径
:param headers: 用户指定的HTTP头部。可以指定Content-Type、Content-MD5、x-oss-meta-开头的头部等,必须和签名时保持一致
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:return:
"""
logger.debug("Put object from file with signed url, bucket: {0}, sign_url: {1}, file path: {2}".format(
self.bucket_name, sign_url, filename))
with open(to_unicode(filename), 'rb') as f:
return self.put_object_with_url(sign_url, f, headers=headers, progress_callback=progress_callback)
def append_object(self, key, position, data,
headers=None,
progress_callback=None,
init_crc=None):
"""追加上传一个文件。
:param str key: 新的文件名,或已经存在的可追加文件名
:param int position: 追加上传一个新的文件, `position` 设为0;追加一个已经存在的可追加文件, `position` 设为文件的当前长度。
`position` 可以从上次追加的结果 `AppendObjectResult.next_position` 中获得。
:param data: 用户数据
:type data: str、bytes、file-like object或可迭代对象
:param headers: 用户指定的HTTP头部。可以指定Content-Type、Content-MD5、x-oss-开头的头部等
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:return: :class:`AppendObjectResult <oss2.models.AppendObjectResult>`
:raises: 如果 `position` 和当前文件长度不一致,抛出 :class:`PositionNotEqualToLength <oss2.exceptions.PositionNotEqualToLength>` ;
如果当前文件不是可追加类型,抛出 :class:`ObjectNotAppendable <oss2.exceptions.ObjectNotAppendable>` ;
还会抛出其他一些异常
"""
headers = utils.set_content_type(http.CaseInsensitiveDict(headers), key)
if progress_callback:
data = utils.make_progress_adapter(data, progress_callback)
if self.enable_crc and init_crc is not None:
data = utils.make_crc_adapter(data, init_crc)
logger.debug("Start to append object, bucket: {0}, key: {1}, headers: {2}, position: {3}".format(
self.bucket_name, to_string(key), headers, position))
resp = self.__do_object('POST', key,
data=data,
headers=headers,
params={'append': '', 'position': str(position)})
logger.debug("Append object done, req_id: {0}, statu_code: {1}".format(resp.request_id, resp.status))
result = AppendObjectResult(resp)
if self.enable_crc and result.crc is not None and init_crc is not None:
utils.check_crc('append object', data.crc, result.crc, result.request_id)
return result
def get_object(self, key,
byte_range=None,
headers=None,
progress_callback=None,
process=None,
params=None):
"""下载一个文件。
用法 ::
>>> result = bucket.get_object('readme.txt')
>>> print(result.read())
'hello world'
:param key: 文件名
:param byte_range: 指定下载范围。参见 :ref:`byte_range`
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:param process: oss文件处理,如图像服务等。指定后process,返回的内容为处理后的文件。
:param params: http 请求的查询字符串参数
:type params: dict
:return: file-like object
:raises: 如果文件不存在,则抛出 :class:`NoSuchKey <oss2.exceptions.NoSuchKey>` ;还可能抛出其他异常
"""
headers = http.CaseInsensitiveDict(headers)
range_string = _make_range_string(byte_range)
if range_string:
headers['range'] = range_string
params = {} if params is None else params
if process:
params.update({Bucket.PROCESS: process})
logger.debug("Start to get object, bucket: {0}, key: {1}, range: {2}, headers: {3}, params: {4}".format(
self.bucket_name, to_string(key), range_string, headers, params))
resp = self.__do_object('GET', key, headers=headers, params=params)
logger.debug("Get object done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return GetObjectResult(resp, progress_callback, self.enable_crc)
def select_object(self, key, sql,
progress_callback=None,
select_params=None,
byte_range=None,
headers=None
):
"""Select一个文件内容,支持(Csv,Json Doc,Json Lines及其GZIP压缩文件).
用法 ::
对于Csv:
>>> result = bucket.select_object('access.log', 'select * from ossobject where _4 > 40')
>>> print(result.read())
'hello world'
对于Json Doc: { contacts:[{"firstName":"abc", "lastName":"def"},{"firstName":"abc1", "lastName":"def1"}]}
>>> result = bucket.select_object('sample.json', 'select s.firstName, s.lastName from ossobject.contacts[*] s', select_params = {"Json_Type":"DOCUMENT"})
对于Json Lines: {"firstName":"abc", "lastName":"def"},{"firstName":"abc1", "lastName":"def1"}
>>> result = bucket.select_object('sample.json', 'select s.firstName, s.lastName from ossobject s', select_params = {"Json_Type":"LINES"})
:param key: 文件名
:param sql: sql statement
:param select_params: select参数集合,对于Json文件必须制定Json_Type类型。参见 :ref:`select_params`
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:param byte_range: select content of specific range。可以设置Bytes header指定select csv时的文件起始offset和长度。
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:return: file-like object
:raises: 如果文件不存在,则抛出 :class:`NoSuchKey <oss2.exceptions.NoSuchKey>` ;还可能抛出其他异常
"""
range_select = False
headers = http.CaseInsensitiveDict(headers)
range_string = _make_range_string(byte_range)
if range_string:
headers['range'] = range_string
range_select = True
if (range_select == True and
(select_params is None or
(SelectParameters.AllowQuotedRecordDelimiter not in select_params or str(select_params[SelectParameters.AllowQuotedRecordDelimiter]).lower() != 'false'))):
raise ClientError('"AllowQuotedRecordDelimiter" must be specified in select_params as False when "Range" is specified in header.')
body = xml_utils.to_select_object(sql, select_params)
params = {'x-oss-process': 'csv/select'}
if select_params is not None and SelectParameters.Json_Type in select_params:
params['x-oss-process'] = 'json/select'
self.timeout = 3600
resp = self.__do_object('POST', key, data=body, headers=headers, params=params)
crc_enabled = False
if select_params is not None and SelectParameters.EnablePayloadCrc in select_params:
if str(select_params[SelectParameters.EnablePayloadCrc]).lower() == "true":
crc_enabled = True
return SelectObjectResult(resp, progress_callback, crc_enabled)
def get_object_to_file(self, key, filename,
byte_range=None,
headers=None,
progress_callback=None,
process=None,
params=None):
"""下载一个文件到本地文件。
:param key: 文件名
:param filename: 本地文件名。要求父目录已经存在,且有写权限。
:param byte_range: 指定下载范围。参见 :ref:`byte_range`
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:param process: oss文件处理,如图像服务等。指定后process,返回的内容为处理后的文件。
:param params: http 请求的查询字符串参数
:type params: dict
:return: 如果文件不存在,则抛出 :class:`NoSuchKey <oss2.exceptions.NoSuchKey>` ;还可能抛出其他异常
"""
logger.debug("Start to get object to file, bucket: {0}, key: {1}, file path: {2}".format(
self.bucket_name, to_string(key), filename))
with open(to_unicode(filename), 'wb') as f:
result = self.get_object(key, byte_range=byte_range, headers=headers, progress_callback=progress_callback,
process=process, params=params)
if result.content_length is None:
shutil.copyfileobj(result, f)
else:
utils.copyfileobj_and_verify(result, f, result.content_length, request_id=result.request_id)
if self.enable_crc and byte_range is None:
if (headers is None) or ('Accept-Encoding' not in headers) or (headers['Accept-Encoding'] != 'gzip'):
utils.check_crc('get', result.client_crc, result.server_crc, result.request_id)
return result
def get_object_with_url(self, sign_url,
byte_range=None,
headers=None,
progress_callback=None):
"""使用加签的url下载文件
:param sign_url: 加签的url
:param byte_range: 指定下载范围。参见 :ref:`byte_range`
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict,必须和签名时保持一致
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:return: file-like object
:raises: 如果文件不存在,则抛出 :class:`NoSuchKey <oss2.exceptions.NoSuchKey>` ;还可能抛出其他异常
"""
headers = http.CaseInsensitiveDict(headers)
range_string = _make_range_string(byte_range)
if range_string:
headers['range'] = range_string
logger.debug("Start to get object with url, bucket: {0}, sign_url: {1}, range: {2}, headers: {3}".format(
self.bucket_name, sign_url, range_string, headers))
resp = self._do_url('GET', sign_url, headers=headers)
return GetObjectResult(resp, progress_callback, self.enable_crc)
def get_object_with_url_to_file(self, sign_url,
filename,
byte_range=None,
headers=None,
progress_callback=None):
"""使用加签的url下载文件
:param sign_url: 加签的url
:param filename: 本地文件名。要求父目录已经存在,且有写权限。
:param byte_range: 指定下载范围。参见 :ref:`byte_range`
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict,,必须和签名时保持一致
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:return: file-like object
:raises: 如果文件不存在,则抛出 :class:`NoSuchKey <oss2.exceptions.NoSuchKey>` ;还可能抛出其他异常
"""
logger.debug(
"Start to get object with url, bucket: {0}, sign_url: {1}, file path: {2}, range: {3}, headers: {4}"
.format(self.bucket_name, sign_url, filename, byte_range, headers))
with open(to_unicode(filename), 'wb') as f:
result = self.get_object_with_url(sign_url, byte_range=byte_range, headers=headers,
progress_callback=progress_callback)
if result.content_length is None:
shutil.copyfileobj(result, f)
else:
utils.copyfileobj_and_verify(result, f, result.content_length, request_id=result.request_id)
return result
def select_object_to_file(self, key, filename, sql,
progress_callback=None,
select_params=None,
headers=None
):
"""Select一个文件的内容到本地文件
:param key: OSS文件名
:param filename: 本地文件名。其父亲目录已经存在且有写权限。
:param progress_callback: 调用进度的callback。参考 :ref:`progress_callback`
:param select_params: select参数集合。参见 :ref:`select_params`
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
:return: 如果文件不存在, 抛出 :class:`NoSuchKey <oss2.exceptions.NoSuchKey>`
"""
with open(to_unicode(filename), 'wb') as f: