Skip to content

Commit

Permalink
support shard_list for pull_log_dump
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Jan 9, 2019
1 parent 1264e57 commit 57d31b3
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
7 changes: 5 additions & 2 deletions aliyun/log/logclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ def pull_log(self, project_name, logstore_name, shard_id, from_time, to_time, ba
begin_cursor = res.get_next_cursor()

def pull_log_dump(self, project_name, logstore_name, from_time, to_time, file_path, batch_size=500,
compress=True, encodings=None):
compress=True, encodings=None, shard_list=None):
""" dump all logs seperatedly line into file_path, file_path, the time parameters are log received time on server side.
:type project_name: string
Expand All @@ -898,6 +898,9 @@ def pull_log_dump(self, project_name, logstore_name, from_time, to_time, file_pa
:type encodings: string list
:param encodings: encoding like ["utf8", "latin1"] etc to dumps the logs in json format to file. default is ["utf8",]
:type shard_list: string
:param shard_list: shard number list. could be comma seperated list or range: 1,20,31-40
:return: LogResponse {"total_count": 30, "files": {'file_path_1': 10, "file_path_2": 20} })
:raise: LogException
Expand All @@ -906,7 +909,7 @@ def pull_log_dump(self, project_name, logstore_name, from_time, to_time, file_pa
file_path += "{}"

return pull_log_dump(self, project_name, logstore_name, from_time, to_time, file_path,
batch_size=batch_size, compress=compress, encodings=encodings)
batch_size=batch_size, compress=compress, encodings=encodings, shard_list=shard_list)

def create_logstore(self, project_name, logstore_name,
ttl=30,
Expand Down
11 changes: 7 additions & 4 deletions aliyun/log/logclient_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,18 +362,21 @@ def dump_worker(client, project_name, logstore_name, from_time, to_time,


def pull_log_dump(client, project_name, logstore_name, from_time, to_time, file_path, batch_size=500, compress=True,
encodings=None):
encodings=None, shard_list=None):
cpu_count = multiprocessing.cpu_count() * 2

shards = client.list_shards(project_name, logstore_name).get_shards_info()
worker_size = min(cpu_count, len(shards))
current_shards = [str(shard['shardID']) for shard in shards]
target_shards = _parse_shard_list(shard_list, current_shards)
worker_size = min(cpu_count, len(target_shards))

result = dict()
total_count = 0
with ProcessPoolExecutor(max_workers=worker_size) as pool:
futures = [pool.submit(dump_worker, client, project_name, logstore_name, from_time, to_time,
shard_id=shard['shardID'], file_path=file_path.format(shard['shardID']),
shard_id=shard, file_path=file_path.format(shard),
batch_size=batch_size, compress=compress, encodings=encodings)
for shard in shards]
for shard in target_shards]

for future in as_completed(futures):
file_path, count = future.result()
Expand Down

0 comments on commit 57d31b3

Please sign in to comment.