Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oss select #114

Merged
merged 17 commits into from
May 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 54 additions & 0 deletions examples/select_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-

import os
import oss2


def select_call_back(consumed_bytes, total_bytes = None):
print('Consumed Bytes:' + str(consumed_bytes) + '\n')
# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。
# 通过环境变量获取,或者把诸如“<你的AccessKeyId>”替换成真实的AccessKeyId等。
#
# 以杭州区域为例,Endpoint可以是:
# http://oss-cn-hangzhou.aliyuncs.com
# https://oss-cn-hangzhou.aliyuncs.com
# 分别以HTTP、HTTPS协议访问。
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>')
# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '请设置参数:' + param


# 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
#objects = bucket.list_objects()
key = 'python_select.csv'
content = 'Tom Hanks,USA,45\r\n'*1024
filename = 'python_select.csv'

# 上传文件
bucket.put_object(key, content)

csv_meta_params = {'CsvHeaderInfo': 'None',
'RecordDelimiter': '\r\n'}

select_csv_params = {'CsvHeaderInfo': 'None',
'RecordDelimiter': '\r\n',
'LineRange': (500, 1000)}

csv_header = bucket.create_select_object_meta(key, csv_meta_params)
print(csv_header.csv_rows)
print(csv_header.csv_splits)

result = bucket.select_object(key, "select * from ossobject where _3 > 44 limit 100000", select_call_back, select_csv_params)
content_got = b''
for chunk in result:
content_got += chunk
print(content_got)
result = bucket.select_object_to_file(key, filename,
"select * from ossobject where _3 > 44 limit 100000", select_call_back, select_csv_params)

bucket.delete_object(key)
136 changes: 134 additions & 2 deletions oss2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
- :class:`RequestError <oss2.exceptions.RequestError>` :底层requests库抛出的异常,如DNS解析错误,超时等;
当然,`Bucket.put_object_from_file` 和 `Bucket.get_object_to_file` 这类函数还会抛出文件相关的异常。


.. _byte_range:

指定下载范围
Expand All @@ -55,6 +54,32 @@
后续的调用使用返回值中的 `next_marker` 、 `next_key_marker` 等。每次调用后检查返回值中的 `is_truncated` ,其值为 `False` 说明
已经到了最后一页。

.. _line_range:

指定查询CSV文件范围
------------
诸如 :func:`select_object <Bucket.select_object>` 以及 :func:`select_object_to_file <Bucket.select_object_to_file>` 这样的函数的select_csv_params参数,可以接受
`LineRange` 参数,表明读取CSV数据的范围。该参数是一个二元tuple:(start, last):
- LineRange 为 (0, 99) 表示读取前100行
- LineRange 为 (None, 99) 表示读取最后99行
- LineRange 为 (100, None) 表示读取第101行到文件结尾的部分(包含第101行)

.. _split_range:

指定查询CSV文件范围
------------
split可以认为是切分好的大小大致相等的csv行簇。每个Split大小大致相等,这样以便更好的做到负载均衡。
诸如 :func:`select_object <Bucket.select_object>` 以及 :func:`select_object_to_file <Bucket.select_object_to_file>` 这样的函数的select_csv_params参数,可以接受
`SplitRange` 参数,表明读取CSV数据的范围。该参数是一个二元tuple:(start, last):
- SplitRange 为 (0, 9) 表示读取前10个Split
- SplitRange 为 (None, 9) 表示读取最后9个split
- SplitRange 为 (10, None) 表示读取第11个split到文件结尾的部分(包含第11个Split)

分页查询
-------
和get_csv_object_meta配合使用,有两种方法:
- 方法1:先获取文件总的行数(get_csv_object_meta返回),然后把文件以line_range分成若干部分并行查询
- 方法2:先获取文件总的Split数(get_csv_object_meta返回), 然后把文件分成若干个请求,每个请求含有大致相等的Split

.. _progress_callback:

Expand Down Expand Up @@ -108,6 +133,28 @@ def progress_callback(bytes_consumed, total_bytes):
>>> d = oss2.iso8601_to_date('2015-12-05T00:00:00.000Z') # 得到 datetime.date(2015, 12, 5)
>>> date_str = oss2.date_to_iso8601(d) # 得到 '2015-12-05T00:00:00.000Z'
>>> oss2.iso8601_to_unixtime(date_str) # 得到 1449273600

.. _select_params:

指定OSS Select的CSV文件格式,支持如下Keys:
>>> CsvHeaderInfo: None|Use|Ignore #None表示没有CSV Schema头,Use表示启用CSV Schema头,可以在Select语句中使用Name,Ignore表示有CSV Schema头,但忽略它(Select语句中不可以使用Name)
默认值是None
>>> CommentCharacter: Comment字符,默认值是#,不支持多个字符
>>> RecordDelimiter: 行分隔符,默认是\n,最多支持两个字符分隔符(比如:\r\n)
>>> FieldDelimiter: 列分隔符,默认是逗号(,), 不支持多个字符
>>> QuoteCharacter: 列Quote字符,默认是双引号("),不支持多个字符。注意转义符合Quote字符相同。
>>> LineRange: 指定查询CSV文件的行范围,参见 `line_range`。
>>> SplitRange: 指定查询CSV文件的Split范围,参见 `split_range`.
注意LineRange和SplitRange两种不能同时指定。若同时指定LineRange会被忽略。

.. _select_meta_params:

create_select_object_meta参数集合,支持如下Keys:
- RecordDelimiter: CSV换行符,最多支持两个字符
- FieldDelimiter: CSV列分隔符,最多支持一个字符
- QuoteCharacter: CSV转移Quote符,最多支持一个字符
- OverwriteIfExists: true|false. true表示重新获得csv meta,并覆盖原有的meta。一般情况下不需要使用

"""

from . import xml_utils
Expand Down Expand Up @@ -468,6 +515,35 @@ def get_object(self, key,
resp = self.__do_object('GET', key, headers=headers, params=params)
return GetObjectResult(resp, progress_callback, self.enable_crc)

def select_object(self, key, sql,
progress_callback=None,
select_params=None
):
"""Select一个CSV文件内容.

用法 ::

>>> result = bucket.select_object('access.log', 'select * from ossobject where _4 > 40')
>>> print(result.read())
'hello world'

:param key: 文件名
:param sql: sql statement
:param select_params: select参数集合。参见 :ref:`select_csv_params`

:param progress_callback: 用户指定的进度回调函数。参考 :ref:`progress_callback`
:return: file-like object

:raises: 如果文件不存在,则抛出 :class:`NoSuchKey <oss2.exceptions.NoSuchKey>` ;还可能抛出其他异常
"""
headers = http.CaseInsensitiveDict()
body = xml_utils.to_select_object(sql, select_params)
params = {'x-oss-process': 'csv/select'}

self.timeout = 3600
resp = self.__do_object('POST', key, data=body, headers=headers, params=params)
return SelectObjectResult(resp, progress_callback, False)

def get_object_to_file(self, key, filename,
byte_range=None,
headers=None,
Expand Down Expand Up @@ -503,6 +579,29 @@ def get_object_to_file(self, key, filename,

return result

def select_object_to_file(self, key, filename, sql,
progress_callback=None,
select_params=None
):
"""Select Content from OSS file to a local file

:param key: OSS key name
:param filename: local file name。The parent directory should exist

:param progress_callback: progress callback。参考 :ref:`progress_callback`
:param select_params: select参数集合。参见 :ref:`select_csv_params`

:return: If file does not exist, throw :class:`NoSuchKey <oss2.exceptions.NoSuchKey>`
"""
with open(to_unicode(filename), 'wb') as f:
result = self.select_object(key, sql, progress_callback=progress_callback,
select_params=select_params)

for chunk in result:
f.write(chunk)

return result

def head_object(self, key, headers=None):
"""获取文件元信息。

Expand All @@ -525,6 +624,40 @@ def head_object(self, key, headers=None):
resp = self.__do_object('HEAD', key, headers=headers)
return HeadObjectResult(resp)

def create_select_object_meta(self, key, select_meta_params=None):
"""获取或创建CSV文件元信息。如果元信息存在,返回之;不然则创建后返回之

HTTP响应的头部包含了文件元信息,可以通过 `RequestResult` 的 `headers` 成员获得。
用法 ::

>>> select_meta_params = { 'FieldDelimiter': ',',
'RecordDelimiter': '\r\n',
'QuoteCharacter': '"',
'OverwriteIfExists' : 'false'}
>>> result = bucket.create_select_object_meta('csv.txt', csv_params)
>>> print(result.content_type)
text/plain

:param key: object name
:param select_meta_params: the parameter dictionary. For the supported keys, refer to :ref:`csv_meta_params`
:return: :class:`GetSelectObjectMetaResult <oss2.models.HeadObjectResult>`.
Beside the csv_rows, csv_splits field, it also include x-oss-select-csv-rows, x-oss-select-csv-splits and x-oss-select-csv-columns headers.
csv_rows are the total lines of the csv file.
csv_splits are the total splits of the csv file. One split a bunch of rows and each split has very similar size.
Header x-oss-select-csv-rows and x-oss-select-csv-splits are the raw data for csv_rows and csv_splits.
x-oss-select-csv-columns header specifies the first line's column count.

:raises: If Bucket or object does not exist, throw:class:`NotFound <oss2.exceptions.NotFound>`
"""
headers = http.CaseInsensitiveDict()

body = xml_utils.to_get_select_object_meta(select_meta_params)
params = {'x-oss-process': 'csv/meta'}

self.timeout = 3600
resp = self.__do_object('POST', key, data = body, headers=headers, params=params)
return GetSelectObjectMetaResult(resp)

def get_object_meta(self, key):
"""获取文件基本元信息,包括该Object的ETag、Size(文件大小)、LastModified,并不返回其内容。

Expand Down Expand Up @@ -1153,7 +1286,6 @@ def _make_range_string(range):

return 'bytes=' + _range(start, last)


def _range(start, last):
def to_str(pos):
if pos is None:
Expand Down
11 changes: 10 additions & 1 deletion oss2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ class BucketNotEmpty(Conflict):
status = 409
code = 'BucketNotEmpty'


class PositionNotEqualToLength(Conflict):
status = 409
code = 'PositionNotEqualToLength'
Expand Down Expand Up @@ -215,6 +214,16 @@ class AccessDenied(ServerError):
status = 403
code = 'AccessDenied'

class SelectOperationFailed(ServerError):
code = 'SelectOperationFailed'
def __init__(self, status, message):
self.status = status
self.message = message

def __str__(self):
error = {'status': self.status,
'details': self.message}
return str(error)

def make_exception(resp):
status = resp.status
Expand Down
17 changes: 16 additions & 1 deletion oss2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .utils import http_to_unixtime, make_progress_adapter, make_crc_adapter
from .exceptions import ClientError
from .compat import urlunquote
from .select_response import SelectResponseAdapter

class PartInfo(object):
"""表示分片信息的文件。
Expand Down Expand Up @@ -75,6 +76,11 @@ def __init__(self, resp):
#: HTTP ETag
self.etag = _get_etag(self.headers)

class GetSelectObjectMetaResult(HeadObjectResult):
def __init__(self, resp):
super(GetSelectObjectMetaResult, self).__init__(resp)
self.csv_rows = int(self.headers['x-oss-select-csv-rows'])
self.csv_splits = int(self.headers['x-oss-select-csv-splits'])

class GetObjectMetaResult(RequestResult):
def __init__(self, resp):
Expand Down Expand Up @@ -102,7 +108,7 @@ class GetObjectResult(HeadObjectResult):
def __init__(self, resp, progress_callback=None, crc_enabled=False):
super(GetObjectResult, self).__init__(resp)
self.__crc_enabled = crc_enabled

if progress_callback:
self.stream = make_progress_adapter(self.resp, progress_callback, self.content_length)
else:
Expand All @@ -129,6 +135,15 @@ def client_crc(self):
def server_crc(self):
return self.__crc

class SelectObjectResult(HeadObjectResult):
def __init__(self, resp, progress_callback=None, crc_enabled=False):
super(SelectObjectResult, self).__init__(resp)
self.__crc_enabled = crc_enabled
self.select_resp = SelectResponseAdapter(resp, progress_callback, None)
self.stream = self.select_resp

def __iter__(self):
return iter(self.stream)

class PutObjectResult(RequestResult):
def __init__(self, resp):
Expand Down