Skip to content

Commit

Permalink
Merge pull request #114 from qixu001/oss-select
Browse files Browse the repository at this point in the history
Oss select APIs for preview
  • Loading branch information
qixu001 committed May 23, 2018
2 parents fc4d6fb + bf3c9cc commit 27cde7f
Show file tree
Hide file tree
Showing 10 changed files with 4,681 additions and 4 deletions.
54 changes: 54 additions & 0 deletions examples/select_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-

import os
import oss2


def select_call_back(consumed_bytes, total_bytes = None):
print('Consumed Bytes:' + str(consumed_bytes) + '\n')
# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。
# 通过环境变量获取,或者把诸如“<你的AccessKeyId>”替换成真实的AccessKeyId等。
#
# 以杭州区域为例,Endpoint可以是:
# http://oss-cn-hangzhou.aliyuncs.com
# https://oss-cn-hangzhou.aliyuncs.com
# 分别以HTTP、HTTPS协议访问。
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>')
# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '请设置参数:' + param


# 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
#objects = bucket.list_objects()
key = 'python_select.csv'
content = 'Tom Hanks,USA,45\r\n'*1024
filename = 'python_select.csv'

# 上传文件
bucket.put_object(key, content)

csv_meta_params = {'CsvHeaderInfo': 'None',
'RecordDelimiter': '\r\n'}

select_csv_params = {'CsvHeaderInfo': 'None',
'RecordDelimiter': '\r\n',
'LineRange': (500, 1000)}

csv_header = bucket.create_select_object_meta(key, csv_meta_params)
print(csv_header.csv_rows)
print(csv_header.csv_splits)

result = bucket.select_object(key, "select * from ossobject where _3 > 44 limit 100000", select_call_back, select_csv_params)
content_got = b''
for chunk in result:
content_got += chunk
print(content_got)
result = bucket.select_object_to_file(key, filename,
"select * from ossobject where _3 > 44 limit 100000", select_call_back, select_csv_params)

bucket.delete_object(key)
136 changes: 134 additions & 2 deletions oss2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
- :class:`RequestError <oss2.exceptions.RequestError>` :底层requests库抛出的异常,如DNS解析错误,超时等;
当然,`Bucket.put_object_from_file` 和 `Bucket.get_object_to_file` 这类函数还会抛出文件相关的异常。
.. _byte_range:
指定下载范围
Expand All @@ -55,6 +54,32 @@
后续的调用使用返回值中的 `next_marker` 、 `next_key_marker` 等。每次调用后检查返回值中的 `is_truncated` ,其值为 `False` 说明
已经到了最后一页。
.. _line_range:
指定查询CSV文件范围
------------
诸如 :func:`select_object <Bucket.select_object>` 以及 :func:`select_object_to_file <Bucket.select_object_to_file>` 这样的函数的select_csv_params参数,可以接受
`LineRange` 参数,表明读取CSV数据的范围。该参数是一个二元tuple:(start, last):
- LineRange 为 (0, 99) 表示读取前100行
- LineRange 为 (None, 99) 表示读取最后99行
- LineRange 为 (100, None) 表示读取第101行到文件结尾的部分(包含第101行)
.. _split_range:
指定查询CSV文件范围
------------
split可以认为是切分好的大小大致相等的csv行簇。每个Split大小大致相等,这样以便更好的做到负载均衡。
诸如 :func:`select_object <Bucket.select_object>` 以及 :func:`select_object_to_file <Bucket.select_object_to_file>` 这样的函数的select_csv_params参数,可以接受
`SplitRange` 参数,表明读取CSV数据的范围。该参数是一个二元tuple:(start, last):
- SplitRange 为 (0, 9) 表示读取前10个Split
- SplitRange 为 (None, 9) 表示读取最后9个split
- SplitRange 为 (10, None) 表示读取第11个split到文件结尾的部分(包含第11个Split)
分页查询
-------
和get_csv_object_meta配合使用,有两种方法:
- 方法1:先获取文件总的行数(get_csv_object_meta返回),然后把文件以line_range分成若干部分并行查询
- 方法2:先获取文件总的Split数(get_csv_object_meta返回), 然后把文件分成若干个请求,每个请求含有大致相等的Split
.. _progress_callback:
Expand Down Expand Up @@ -108,6 +133,28 @@ def progress_callback(bytes_consumed, total_bytes):
>>> d = oss2.iso8601_to_date('2015-12-05T00:00:00.000Z') # 得到 datetime.date(2015, 12, 5)
>>> date_str = oss2.date_to_iso8601(d) # 得到 '2015-12-05T00:00:00.000Z'
>>> oss2.iso8601_to_unixtime(date_str) # 得到 1449273600
.. _select_params:
指定OSS Select的CSV文件格式,支持如下Keys:
>>> CsvHeaderInfo: None|Use|Ignore #None表示没有CSV Schema头,Use表示启用CSV Schema头,可以在Select语句中使用Name,Ignore表示有CSV Schema头,但忽略它(Select语句中不可以使用Name)
默认值是None
>>> CommentCharacter: Comment字符,默认值是#,不支持多个字符
>>> RecordDelimiter: 行分隔符,默认是\n,最多支持两个字符分隔符(比如:\r\n)
>>> FieldDelimiter: 列分隔符,默认是逗号(,), 不支持多个字符
>>> QuoteCharacter: 列Quote字符,默认是双引号("),不支持多个字符。注意转义符合Quote字符相同。
>>> LineRange: 指定查询CSV文件的行范围,参见 `line_range`。
>>> SplitRange: 指定查询CSV文件的Split范围,参见 `split_range`.
注意LineRange和SplitRange两种不能同时指定。若同时指定LineRange会被忽略。
.. _select_meta_params:
create_select_object_meta参数集合,支持如下Keys:
- RecordDelimiter: CSV换行符,最多支持两个字符
- FieldDelimiter: CSV列分隔符,最多支持一个字符
- QuoteCharacter: CSV转移Quote符,最多支持一个字符
- OverwriteIfExists: true|false. true表示重新获得csv meta,并覆盖原有的meta。一般情况下不需要使用
"""

from . import xml_utils
Expand Down Expand Up @@ -468,6 +515,35 @@ def get_object(self, key,
resp = self.__do_object('GET', key, headers=headers, params=params)
return GetObjectResult(resp, progress_callback, self.enable_crc)

def select_object(self, key, sql,
progress_callback=None,
select_params=None
):
"""Select一个CSV文件内容.
用法 ::
>>> result = bucket.select_object('access.log', 'select * from ossobject where _4 > 40')
>>> print(result.read())
'hello world'
:param key: 文件名
:param sql: sql statement
:param select_params: select参数集合。参见 :ref:`select_csv_params`
:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:return: file-like object
:raises: 如果文件不存在,则抛出 :class:`NoSuchKey <oss2.exceptions.NoSuchKey>` ;还可能抛出其他异常
"""
headers = http.CaseInsensitiveDict()
body = xml_utils.to_select_object(sql, select_params)
params = {'x-oss-process': 'csv/select'}

self.timeout = 3600
resp = self.__do_object('POST', key, data=body, headers=headers, params=params)
return SelectObjectResult(resp, progress_callback, False)

def get_object_to_file(self, key, filename,
byte_range=None,
headers=None,
Expand Down Expand Up @@ -503,6 +579,29 @@ def get_object_to_file(self, key, filename,

return result

def select_object_to_file(self, key, filename, sql,
progress_callback=None,
select_params=None
):
"""Select Content from OSS file to a local file
:param key: OSS key name
:param filename: local file name。The parent directory should exist
:param progress_callback: progress callback。参考 :ref:`progress_callback`
:param select_params: select参数集合。参见 :ref:`select_csv_params`
:return: If file does not exist, throw :class:`NoSuchKey <oss2.exceptions.NoSuchKey>`
"""
with open(to_unicode(filename), 'wb') as f:
result = self.select_object(key, sql, progress_callback=progress_callback,
select_params=select_params)

for chunk in result:
f.write(chunk)

return result

def head_object(self, key, headers=None):
"""获取文件元信息。
Expand All @@ -525,6 +624,40 @@ def head_object(self, key, headers=None):
resp = self.__do_object('HEAD', key, headers=headers)
return HeadObjectResult(resp)

def create_select_object_meta(self, key, select_meta_params=None):
"""获取或创建CSV文件元信息。如果元信息存在,返回之;不然则创建后返回之
HTTP响应的头部包含了文件元信息,可以通过 `RequestResult` 的 `headers` 成员获得。
用法 ::
>>> select_meta_params = { 'FieldDelimiter': ',',
'RecordDelimiter': '\r\n',
'QuoteCharacter': '"',
'OverwriteIfExists' : 'false'}
>>> result = bucket.create_select_object_meta('csv.txt', csv_params)
>>> print(result.content_type)
text/plain
:param key: object name
:param select_meta_params: the parameter dictionary. For the supported keys, refer to :ref:`csv_meta_params`
:return: :class:`GetSelectObjectMetaResult <oss2.models.HeadObjectResult>`.
Beside the csv_rows, csv_splits field, it also include x-oss-select-csv-rows, x-oss-select-csv-splits and x-oss-select-csv-columns headers.
csv_rows are the total lines of the csv file.
csv_splits are the total splits of the csv file. One split a bunch of rows and each split has very similar size.
Header x-oss-select-csv-rows and x-oss-select-csv-splits are the raw data for csv_rows and csv_splits.
x-oss-select-csv-columns header specifies the first line's column count.
:raises: If Bucket or object does not exist, throw:class:`NotFound <oss2.exceptions.NotFound>`
"""
headers = http.CaseInsensitiveDict()

body = xml_utils.to_get_select_object_meta(select_meta_params)
params = {'x-oss-process': 'csv/meta'}

self.timeout = 3600
resp = self.__do_object('POST', key, data = body, headers=headers, params=params)
return GetSelectObjectMetaResult(resp)

def get_object_meta(self, key):
"""获取文件基本元信息,包括该Object的ETag、Size(文件大小)、LastModified,并不返回其内容。
Expand Down Expand Up @@ -1153,7 +1286,6 @@ def _make_range_string(range):

return 'bytes=' + _range(start, last)


def _range(start, last):
def to_str(pos):
if pos is None:
Expand Down
11 changes: 10 additions & 1 deletion oss2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ class BucketNotEmpty(Conflict):
status = 409
code = 'BucketNotEmpty'


class PositionNotEqualToLength(Conflict):
status = 409
code = 'PositionNotEqualToLength'
Expand Down Expand Up @@ -215,6 +214,16 @@ class AccessDenied(ServerError):
status = 403
code = 'AccessDenied'

class SelectOperationFailed(ServerError):
code = 'SelectOperationFailed'
def __init__(self, status, message):
self.status = status
self.message = message

def __str__(self):
error = {'status': self.status,
'details': self.message}
return str(error)

def make_exception(resp):
status = resp.status
Expand Down
17 changes: 16 additions & 1 deletion oss2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .utils import http_to_unixtime, make_progress_adapter, make_crc_adapter
from .exceptions import ClientError
from .compat import urlunquote
from .select_response import SelectResponseAdapter

class PartInfo(object):
"""表示分片信息的文件。
Expand Down Expand Up @@ -75,6 +76,11 @@ def __init__(self, resp):
#: HTTP ETag
self.etag = _get_etag(self.headers)

class GetSelectObjectMetaResult(HeadObjectResult):
def __init__(self, resp):
super(GetSelectObjectMetaResult, self).__init__(resp)
self.csv_rows = int(self.headers['x-oss-select-csv-rows'])
self.csv_splits = int(self.headers['x-oss-select-csv-splits'])

class GetObjectMetaResult(RequestResult):
def __init__(self, resp):
Expand Down Expand Up @@ -102,7 +108,7 @@ class GetObjectResult(HeadObjectResult):
def __init__(self, resp, progress_callback=None, crc_enabled=False):
super(GetObjectResult, self).__init__(resp)
self.__crc_enabled = crc_enabled

if progress_callback:
self.stream = make_progress_adapter(self.resp, progress_callback, self.content_length)
else:
Expand All @@ -129,6 +135,15 @@ def client_crc(self):
def server_crc(self):
return self.__crc

class SelectObjectResult(HeadObjectResult):
def __init__(self, resp, progress_callback=None, crc_enabled=False):
super(SelectObjectResult, self).__init__(resp)
self.__crc_enabled = crc_enabled
self.select_resp = SelectResponseAdapter(resp, progress_callback, None)
self.stream = self.select_resp

def __iter__(self):
return iter(self.stream)

class PutObjectResult(RequestResult):
def __init__(self, resp):
Expand Down

0 comments on commit 27cde7f

Please sign in to comment.