Skip to content

Commit

Permalink
Merge pull request #32 from wjo1212/feature/no_paging_api
Browse files Browse the repository at this point in the history
Feature/no paging api
  • Loading branch information
wjo1212 committed Nov 23, 2017
2 parents d23cc6d + 6e318cc commit 460202c
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 15 deletions.
17 changes: 17 additions & 0 deletions aliyun/log/listlogstoresresponse.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,20 @@ def log_print(self):
print('count:', self.count)
print('total:', self.total)
print('logstores:', self.logstores)

def merge(self, response):
if not isinstance(response, ListLogstoresResponse):
raise ValueError("passed response is not a ListLogstoresResponse: " + str(type(response)))

self.count += response.get_count()
self.total = response.get_total() # use the latest total count
self.logstores.extend(response.get_logstores())

# update body
self.body = {
'count': self.count,
'total': self.total,
'logstores': self.logstores
}

return self
49 changes: 39 additions & 10 deletions aliyun/log/logclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from .listlogstoresresponse import ListLogstoresResponse
from .listtopicsresponse import ListTopicsResponse
from .log_logs_pb2 import LogGroup
from .logclient_operator import copy_project
from .logclient_operator import copy_project, list_more
from .logexception import LogException
from .logstore_config_response import *
from .logtail_config_response import *
Expand All @@ -42,6 +42,7 @@
import zlib

CONNECTION_TIME_OUT = 20
MAX_LIST_PAGING_SIZE = 500

"""
LogClient class is the main class in the SDK. It can be used to communicate with
Expand Down Expand Up @@ -580,7 +581,7 @@ def pull_logs(self, project_name, logstore_name, shard_id, cursor, count=1000, e
:param end_cursor: the end cursor position to get data
:type compress: boolean
:param compress: if use lz4 compress for transfer data
:param compress: if use zip compress for transfer data
:return: PullLogResponse
Expand Down Expand Up @@ -719,7 +720,7 @@ def update_logstore(self, project_name, logstore_name, ttl, shard_count):
return UpdateLogStoreResponse(header, resp)

def list_logstore(self, project_name, logstore_name_pattern=None, offset=0, size=100):
""" list the logstore in a project
""" list the logstore in a projectListLogStoreResponse
Unsuccessful opertaion will cause an LogException.
:type project_name: string
Expand All @@ -732,12 +733,18 @@ def list_logstore(self, project_name, logstore_name_pattern=None, offset=0, size
:param offset: the offset of all the matched names
:type size: int
:param size: the max return names count
:param size: the max return names count, -1 means all
:return: ListLogStoreResponse
:raise: LogException
"""

# need to use extended method to get more
if int(size) == -1 or int(size) > MAX_LIST_PAGING_SIZE:
return list_more(self.list_logstore, int(offset), int(size), MAX_LIST_PAGING_SIZE,
project_name, logstore_name_pattern)

headers = {}
params = {}
resource = "/logstores"
Expand Down Expand Up @@ -1038,12 +1045,16 @@ def list_logtail_config(self, project_name, offset=0, size=100):
:param offset: the offset of all config names
:type size: int
:param size: the max return names count
:param size: the max return names count, -1 means all
:return: ListLogtailConfigResponse
:raise: LogException
"""
# need to use extended method to get more
if int(size) == -1 or int(size) > MAX_LIST_PAGING_SIZE:
return list_more(self.list_logtail_config, int(offset), int(size), MAX_LIST_PAGING_SIZE, project_name)

headers = {}
params = {}
resource = "/configs"
Expand Down Expand Up @@ -1153,13 +1164,17 @@ def list_machine_group(self, project_name, offset=0, size=100):
:param offset: the offset of all group name
:type size: int
:param size: the max return names count
:param size: the max return names count, -1 means all
:return: ListMachineGroupResponse
:raise: LogException
"""

# need to use extended method to get more
if int(size) == -1 or int(size) > MAX_LIST_PAGING_SIZE:
return list_more(self.list_machine_group, int(offset), int(size), MAX_LIST_PAGING_SIZE, project_name)

headers = {}
params = {}
resource = "/machinegroups"
Expand All @@ -1182,13 +1197,17 @@ def list_machines(self, project_name, group_name, offset=0, size=100):
:param offset: the offset of all group name
:type size: int
:param size: the max return names count
:param size: the max return names count, -1 means all
:return: ListMachinesResponse
:raise: LogException
"""

# need to use extended method to get more
if int(size) == -1 or int(size) > MAX_LIST_PAGING_SIZE:
return list_more(self.list_machines, int(offset), int(size), MAX_LIST_PAGING_SIZE, project_name, group_name)

headers = {}
params = {}
resource = "/machinegroups/" + group_name + "/machines"
Expand Down Expand Up @@ -1551,15 +1570,20 @@ def get_shipper_tasks(self, project_name, logstore_name, shipper_name, start_tim
:param status_type: support one of ['', 'fail', 'success', 'running'] , if the status_type = '' , return all kinds of status type
:type offset: int
:param offset: the begin task offset
:param offset: the begin task offset, -1 means all
:type size: int
:param size: the needed tasks count
:return: ListShipperResponse
:return: GetShipperTasksResponse
:raise: LogException
"""
# need to use extended method to get more
if int(size) == -1 or int(size) > MAX_LIST_PAGING_SIZE:
return list_more(self.get_shipper_tasks, int(offset), int(size), MAX_LIST_PAGING_SIZE,
project_name, logstore_name, shipper_name, start_time, end_time, status_type)

headers = {}
params = {"from": str(int(start_time)),
"to": str(int(end_time)),
Expand Down Expand Up @@ -1925,12 +1949,17 @@ def list_project(self, offset=0, size=100):
:param offset: the offset of all the matched names
:type size: int
:param size: the max return names count
:param size: the max return names count, -1 means return all data
:return: ListProjectResponse
:raise: LogException
"""

# need to use extended method to get more
if int(size) == -1 or int(size) > MAX_LIST_PAGING_SIZE:
return list_more(self.list_project, int(offset), int(size), MAX_LIST_PAGING_SIZE)

headers = {}
params = {}
resource = "/"
Expand Down
92 changes: 89 additions & 3 deletions aliyun/log/logclient_operator.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from .logexception import LogException
from functools import wraps
import six


def copy_project(from_client, to_client, from_project, to_project):
"""
copy project, logstore, machine group and logtail config to target project,
expecting the target project doens't exist
:type from_client: logclient
:param from_project: logclient instance
:type from_client: LogClient
:param from_client: logclient instance
:type to_client: logclient
:type to_client: LogClient
:param to_client: logclient instance
:type from_project: string
Expand Down Expand Up @@ -83,3 +85,87 @@ def copy_project(from_client, to_client, from_project, to_project):
if count < size or offset >= total:
break


def list_more(fn, offset, size, batch_size, *args):
"""list all data using the fn
"""
if size < 0:
expected_total_size = six.MAXSIZE
else:
expected_total_size = size
batch_size = min(size, batch_size)

response = None
total_count_got = 0
while True:
ret = fn(*args, offset=offset, size=batch_size)
if response is None:
response = ret
else:
response.merge(ret)

count = ret.get_count()
total = ret.get_total()
offset += count
total_count_got += count
if count == 0 or offset >= total or total_count_got >= expected_total_size:
break

return response


def list_logstore_all(client, project):
"""
list all project
:type client: LogClient
:param client: logclient instance
:return:
"""

default_fetch_size = 100

# list logstore and copy them
offset, size = 0, default_fetch_size
response = None
while True:
ret = client.list_logstores(project, offset=offset, size=size)
if response is None:
response = ret
else:
response.merge(ret)

count = ret.get_count()
total = ret.get_total()
offset += count
if count < size or offset >= total:
break


def list_logtail_config_all(client, project):
"""
list all project
:type client: LogClient
:param client: logclient instance
:return:
"""

default_fetch_size = 100

# list logstore and copy them
offset, size = 0, default_fetch_size
response = None
while True:
ret = client.list_logtail_config(project, offset=offset, size=size)
if response is None:
response = ret
else:
response.merge(ret)

count = ret.get_count()
total = ret.get_total()
offset += count
if count < size or offset >= total:
break

27 changes: 27 additions & 0 deletions aliyun/log/logstore_config_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ def get_logstores(self):
"""
return self.logstores

def get_count(self):
return self.count

def get_logstores_count(self):
"""
Expand All @@ -135,6 +138,13 @@ def get_logstores_total(self):
"""
return self.total_count

def get_total(self):
"""
:return:
"""
return self.total_count

def log_print(self):
"""
Expand All @@ -145,3 +155,20 @@ def log_print(self):
print('logstores_count:', str(self.count))
print('logstores_total:', str(self.total_count))
print('logstores:', str(self.logstores))

def merge(self, response):
if not isinstance(response, ListLogStoreResponse):
raise ValueError("passed response is not a ListLogstoresResponse: " + str(type(response)))

self.count += response.get_count()
self.total = response.get_total() # use the latest total count
self.logstores.extend(response.get_logstores())

# update body
self.body = {
'count': self.count,
'total': self.total,
'logstores': self.logstores
}

return self
23 changes: 23 additions & 0 deletions aliyun/log/logtail_config_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,32 @@ def get_configs_count(self):
def get_configs_total(self):
return self.total_count

def get_count(self):
return self.count

def get_total(self):
return self.total_count

def log_print(self):
print('ListLogtailConfigResponse:')
print('headers:', self.get_all_headers())
print('configs_count:', str(self.count))
print('configs_total:', str(self.total_count))
print('configs:', str(self.logtail_configs))

def merge(self, response):
if not isinstance(response, ListLogtailConfigResponse):
raise ValueError("passed response is not a ListLogtailConfigResponse: " + str(type(response)))

self.count += response.get_configs_count()
self.total_count = response.get_configs_total() # use the latest total count
self.logtail_configs.extend(response.get_configs())

# update body
self.body = {
'count': self.count,
'total': self.total_count,
'configs': self.logtail_configs
}

return self

0 comments on commit 460202c

Please sign in to comment.