Skip to content

Commit

Permalink
Merge pull request #39 from wjo1212/master
Browse files Browse the repository at this point in the history
fix issue 38 and add new interface
  • Loading branch information
wjo1212 committed Nov 30, 2017
2 parents 291b8b7 + ddd622e commit c76b44f
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 44 deletions.
24 changes: 11 additions & 13 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,27 +516,25 @@ res.log_print()

### 获取(Get)日志库数据
消费特定日志库, 根据索引获取数据, 需要传入时间范围, 也可以传入查询语句.
下面的例子查询时间是过去一小时特定日志库的日志.
下面的例子查询时间是过去一小时特定日志库的前100条日志.

```python
from time import time
from aliyun.log import GetLogsRequest
request_json = {
"project": "project1",
"logstore": "logstore1",
"topic": "",
"toTime": str(int(time())),
"offset": "0",
"query": "*",
"line": "100",
"fromTime": str(int(time()-3600)),
"reverse": "false"
}
request = GetLogsRequest("project1", "logstore1", fromTime=int(time()-3600), toTime=int(time()), topic='', query="*", line=1000, offset=0, reverse=False)
request = GetLogsRequest("project1", "logstore1", fromTime=int(time()-3600), toTime=int(time()), topic='', query="*", line=100, offset=0, reverse=False)
res = client.get_logs(request)
res.log_print()
```

也可以通过接口`get_log`来获取
```python
from time import time
res = client.get_log("project1", "logstore1", from_time=int(time()-3600), to_time=int(time()), size=-1)
res.log_print()
```

这里传入`size=-1`可以获取所有.


### 获取数据分布图
通过`get_histograms`来根据索引获取数据特定日志时间范围内的分布图.
Expand Down
2 changes: 1 addition & 1 deletion aliyun/log/getlogsrequest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class GetLogsRequest(LogRequest):
"""

def __init__(self, project=None, logstore=None, fromTime=None, toTime=None, topic=None,
query=None, line=None, offset=None, reverse=None):
query=None, line=100, offset=0, reverse=False):
LogRequest.__init__(self, project)
self.logstore = logstore
self.fromTime = fromTime
Expand Down
12 changes: 12 additions & 0 deletions aliyun/log/getlogsresponse.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,15 @@ def log_print(self):
for log in self.logs:
log.log_print()
print('\n')

def merge(self, response):
if not isinstance(response, GetLogsResponse):
raise ValueError("passed response is not a GetLogsResponse: " + str(type(response)))

self.progress = response.progress
self.logs.extend(response.get_logs())

# update body
self.body.extend(response.body)

return self
110 changes: 80 additions & 30 deletions aliyun/log/logclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from .listlogstoresresponse import ListLogstoresResponse
from .listtopicsresponse import ListTopicsResponse
from .log_logs_pb2 import LogGroup
from .logclient_operator import copy_project, list_more
from .logclient_operator import copy_project, list_more, query_more
from .logexception import LogException
from .logstore_config_response import *
from .logtail_config_response import *
Expand All @@ -43,6 +43,7 @@

CONNECTION_TIME_OUT = 20
MAX_LIST_PAGING_SIZE = 500
MAX_GET_LOG_PAGING_SIZE = 100

"""
LogClient class is the main class in the SDK. It can be used to communicate with
Expand Down Expand Up @@ -204,7 +205,7 @@ def _send(self, method, project, body, resource, params, headers, respons_body_t
return self._sendRequest(method, url, params, body, headers, respons_body_type)

@staticmethod
def get_unicode(key):
def _get_unicode(key):
if isinstance(key, six.binary_type):
key = key.decode('utf-8')
return key
Expand Down Expand Up @@ -248,8 +249,8 @@ def put_logs(self, request):
contents = logItem.get_contents()
for key, value in contents:
content = log.Contents.add()
content.Key = self.get_unicode(key)
content.Value = self.get_unicode(value)
content.Key = self._get_unicode(key)
content.Value = self._get_unicode(value)
if request.get_log_tags() is not None:
tags = request.get_log_tags()
for key, value in tags:
Expand All @@ -268,7 +269,7 @@ def put_logs(self, request):
if is_compress:
headers['x-log-compresstype'] = 'deflate'
compress_data = zlib.compress(body)
#compress_data = logservice_lz4.compress(body)
# compress_data = logservice_lz4.compress(body)

params = {}
logstore = request.get_logstore()
Expand Down Expand Up @@ -357,6 +358,66 @@ def get_histograms(self, request):
(resp, header) = self._send("GET", project, None, resource, params, headers)
return GetHistogramsResponse(resp, header)

def get_log(self, project, logstore, from_time, to_time, topic=None,
query=None, reverse=False, offset=0, size=100):
""" Get logs from log service. will retry when incomplete.
Unsuccessful opertaion will cause an LogException.
:type project: string
:param project: project name
:type logstore: string
:param logstore: logstore name
:type from_time: int
:param from_time: the begin timestamp
:type to_time: int
:param to_time: the end timestamp
:type topic: string
:param topic: topic name of logs, could be None
:type query: string
:param query: user defined query, could be None
:type reverse: bool
:param reverse: if reverse is set to true, the query will return the latest logs first, default is false
:type offset: int
:param offset: line offset of return logs
:type size: int
:param size: max line number of return logs, -1 means get all
:return: GetLogsResponse
:raise: LogException
"""

# need to use extended method to get more
if int(size) == -1 or int(size) > MAX_GET_LOG_PAGING_SIZE:
return query_more(self.get_log, int(offset), int(size), MAX_GET_LOG_PAGING_SIZE,
project, logstore, from_time, to_time, topic,
query, reverse)

headers = {}
params = {'from': from_time,
'to': to_time,
'type': 'log',
'line': size,
'offset': offset,
'reverse': 'true' if reverse else 'false'}

if topic:
params['topic'] = topic
if query:
params['query'] = query

resource = "/logstores/" + logstore
(resp, header) = self._send("GET", project, None, resource, params, headers)
return GetLogsResponse(resp, header)

def get_logs(self, request):
""" Get logs from log service.
Unsuccessful opertaion will cause an LogException.
Expand All @@ -368,28 +429,18 @@ def get_logs(self, request):
:raise: LogException
"""
headers = {}
params = {}
if request.get_topic() is not None:
params['topic'] = request.get_topic()
if request.get_from() is not None:
params['from'] = request.get_from()
if request.get_to() is not None:
params['to'] = request.get_to()
if request.get_query() is not None:
params['query'] = request.get_query()
params['type'] = 'log'
if request.get_line() is not None:
params['line'] = request.get_line()
if request.get_offset() is not None:
params['offset'] = request.get_offset()
if request.get_reverse() is not None:
params['reverse'] = 'true' if request.get_reverse() else 'false'
logstore = request.get_logstore()
project = request.get_project()
resource = "/logstores/" + logstore
(resp, header) = self._send("GET", project, None, resource, params, headers)
return GetLogsResponse(resp, header)
logstore = request.get_logstore()
from_time = request.get_from()
to_time = request.get_to()
topic = request.get_topic()
query = request.get_query()
reverse = request.get_offset() == True
offset = request.get_offset()
size = request.get_line()

return self.get_log(project, logstore, from_time, to_time, topic,
query, reverse, offset, size)

def get_project_logs(self, request):
""" Get logs from log service.
Expand All @@ -407,7 +458,7 @@ def get_project_logs(self, request):
if request.get_query() is not None:
params['query'] = request.get_query()
project = request.get_project()
resource = "/logs"
resource = "/logs"
(resp, header) = self._send("GET", project, None, resource, params, headers)
return GetLogsResponse(resp, header)

Expand Down Expand Up @@ -604,7 +655,7 @@ def pull_logs(self, project_name, logstore_name, shard_id, cursor, count=1000, e
params['end_cursor'] = end_cursor
(resp, header) = self._send("GET", project_name, None, resource, params, headers, "binary")

compress_type = Util.h_v_td(header,'x-log-compresstype', '').lower()
compress_type = Util.h_v_td(header, 'x-log-compresstype', '').lower()
if compress_type == 'lz4':
raw_size = int(Util.h_v_t(header, 'x-log-bodyrawsize'))
raw_data = logservice_lz4.uncompress(raw_size, resp)
Expand Down Expand Up @@ -1702,7 +1753,7 @@ def create_consumer_group(self, project, logstore, consumer_group, timeout, in_o
:param timeout: time-out
:type in_order: bool
:param in_order:
:param in_order: if consume in order, default is False
:return: CreateConsumerGroupResponse
"""
Expand Down Expand Up @@ -1970,4 +2021,3 @@ def list_project(self, offset=0, size=100):
params['size'] = str(size)
(resp, header) = self._send("GET", None, None, resource, params, headers)
return ListProjectResponse(resp, header)

47 changes: 47 additions & 0 deletions aliyun/log/logclient_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from .logexception import LogException
from functools import wraps
import six
import time


DEFAULT_QUERY_RETRY_COUNT = 10
DEFAULT_QUERY_RETRY_INTERVAL = 0.2


def copy_project(from_client, to_client, from_project, to_project, copy_machine_group=False):
Expand Down Expand Up @@ -113,12 +118,54 @@ def list_more(fn, offset, size, batch_size, *args):
total = ret.get_total()
offset += count
total_count_got += count
batch_size = min(batch_size, expected_total_size - total_count_got)

if count == 0 or offset >= total or total_count_got >= expected_total_size:
break

return response


def query_more(fn, offset, size, batch_size, *args):
"""list all data using the fn
"""
if size < 0:
expected_total_size = six.MAXSIZE
else:
expected_total_size = size
batch_size = min(size, batch_size)

response = None
total_count_got = 0
complete = False
while True:
for _c in range(DEFAULT_QUERY_RETRY_COUNT):
ret = fn(*args, offset=offset, size=batch_size)
if ret.is_completed():
complete = True
break

time.sleep(DEFAULT_QUERY_RETRY_INTERVAL)

if response is None:
response = ret
else:
response.merge(ret)

# if incompete, exit
if not complete:
break

count = ret.get_count()
offset += count
total_count_got += count
batch_size = min(batch_size, expected_total_size - total_count_got)
if count == 0 or total_count_got >= expected_total_size:
break

return response


def list_logstore_all(client, project):
"""
list all project
Expand Down
1 change: 1 addition & 0 deletions doc/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ Logs
.. autosummary::
put_logs
pull_logs
get_log
get_logs
get_histograms
get_project_logs
Expand Down
3 changes: 3 additions & 0 deletions tests/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def sample_get_logs(client, project, logstore):
response = client.get_logs(request)
response.log_print()

res = client.get_log(project, logstore, From, To, topic)
res.log_print()


# @log_enter_exit
def sample_get_histograms(client, project, logstore):
Expand Down

0 comments on commit c76b44f

Please sign in to comment.