Skip to content

Commit

Permalink
add no_escape to pull_log_dump
Browse files Browse the repository at this point in the history
handle gbk encoding in KEY case.
  • Loading branch information
wjo1212 committed Jan 9, 2019
1 parent be70e94 commit 9e578ef
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
8 changes: 6 additions & 2 deletions aliyun/log/logclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ def pull_log(self, project_name, logstore_name, shard_id, from_time, to_time, ba
begin_cursor = res.get_next_cursor()

def pull_log_dump(self, project_name, logstore_name, from_time, to_time, file_path, batch_size=500,
compress=True, encodings=None, shard_list=None):
compress=True, encodings=None, shard_list=None, no_escape=None):
""" dump all logs seperatedly line into file_path, file_path, the time parameters are log received time on server side.
:type project_name: string
Expand Down Expand Up @@ -901,6 +901,9 @@ def pull_log_dump(self, project_name, logstore_name, from_time, to_time, file_pa
:type shard_list: string
:param shard_list: shard number list. could be comma seperated list or range: 1,20,31-40
:type no_escape: bool
:param no_escape: if not_escape the non-ANSI, default is to escape, set it to True if don't want it.
:return: LogResponse {"total_count": 30, "files": {'file_path_1': 10, "file_path_2": 20} })
:raise: LogException
Expand All @@ -909,7 +912,8 @@ def pull_log_dump(self, project_name, logstore_name, from_time, to_time, file_pa
file_path += "{}"

return pull_log_dump(self, project_name, logstore_name, from_time, to_time, file_path,
batch_size=batch_size, compress=compress, encodings=encodings, shard_list=shard_list)
batch_size=batch_size, compress=compress, encodings=encodings,
shard_list=shard_list, no_escape=no_escape)

def create_logstore(self, project_name, logstore_name,
ttl=30,
Expand Down
13 changes: 7 additions & 6 deletions aliyun/log/logclient_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,30 +334,31 @@ def default(self, obj):

def dump_worker(client, project_name, logstore_name, from_time, to_time,
shard_id, file_path,
batch_size=1000, compress=True, encodings=None):
batch_size=1000, compress=True, encodings=None, no_escape=None):
res = client.pull_log(project_name, logstore_name, shard_id, from_time, to_time, batch_size=batch_size,
compress=compress)
encodings = encodings or ('utf8', 'latin1', 'gbk')
ensure_ansi = not no_escape

count = 0
for data in res:
for log in data.get_flatten_logs_json():
for log in data.get_flatten_logs_json(decode_bytes=data._is_bytes_type):
with open(file_path, "a+") as f:
count += 1
try:
if six.PY2:
last_ex = None
for encoding in encodings:
try:
f.write(json.dumps(log, encoding=encoding))
f.write(json.dumps(log, encoding=encoding, ensure_ascii=ensure_ansi))
f.write("\n")
break
except UnicodeDecodeError as ex:
last_ex = ex
else:
raise last_ex
else:
f.write(json.dumps(log, cls=get_encoder_cls(encodings)))
f.write(json.dumps(log, cls=get_encoder_cls(encodings), ensure_ascii=ensure_ansi))
f.write("\n")
except Exception as ex:
print("shard: {0} Fail to dump log: {1}".format(shard_id, b64e(repr(log))))
Expand All @@ -367,7 +368,7 @@ def dump_worker(client, project_name, logstore_name, from_time, to_time,


def pull_log_dump(client, project_name, logstore_name, from_time, to_time, file_path, batch_size=500, compress=True,
encodings=None, shard_list=None):
encodings=None, shard_list=None, no_escape=None):
cpu_count = multiprocessing.cpu_count() * 2

shards = client.list_shards(project_name, logstore_name).get_shards_info()
Expand All @@ -380,7 +381,7 @@ def pull_log_dump(client, project_name, logstore_name, from_time, to_time, file_
with ProcessPoolExecutor(max_workers=worker_size) as pool:
futures = [pool.submit(dump_worker, client, project_name, logstore_name, from_time, to_time,
shard_id=shard, file_path=file_path.format(shard),
batch_size=batch_size, compress=compress, encodings=encodings)
batch_size=batch_size, compress=compress, encodings=encodings, no_escape=no_escape)
for shard in target_shards]

for future in as_completed(futures):
Expand Down
2 changes: 1 addition & 1 deletion aliyun/log/pulllog_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def loggroups_to_flattern_list(loggroup_list, time_as_str=None, decode_bytes=Non
u'__source__': logGroup.Source}
item.update(tags)
for content in log.Contents:
item[content.Key] = PullLogResponse._b2u(content.Value) if decode_bytes else content.Value
item[PullLogResponse._b2u(content.Key) if decode_bytes else content.Key] = PullLogResponse._b2u(content.Value) if decode_bytes else content.Value
flatten_logs_json.append(item)
return flatten_logs_json

Expand Down

0 comments on commit 9e578ef

Please sign in to comment.