Skip to content

Commit

Permalink
support i18n for ETL for missed cases
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Dec 26, 2018
1 parent 886b327 commit b649845
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 6 deletions.
15 changes: 13 additions & 2 deletions aliyun/log/etl_core/etl_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from functools import wraps
import re
import copy

import six

def cached(fn):
@wraps(fn)
Expand All @@ -20,6 +20,11 @@ def _wrapped(*args, **kwargs):


def re_full_match(pattern, string, *args, **kwargs):
if six.PY2 and isinstance(pattern, six.binary_type):
pattern = pattern.decode('utf8', 'ignore')
if six.PY2 and isinstance(string, six.binary_type):
string = string.decode('utf8', 'ignore')

if hasattr(re, 'fullmatch'):
return re.fullmatch(pattern, string, *args, **kwargs)
m = re.match(pattern, string, *args, **kwargs)
Expand All @@ -38,11 +43,17 @@ def _real_fn(e):

@cached
def get_re_full_match(pattern, flags=0):
if six.PY2 and isinstance(pattern, six.binary_type):
pattern = pattern.decode('utf8', 'ignore')

p = re.compile(pattern, flags=flags)
if hasattr(p, 'fullmatch'):
if hasattr(p, 'fullmatch'): # normally it's Py3
return p.fullmatch

def ptn_full_match(string, *args, **kwargs):
if six.PY2 and isinstance(string, six.binary_type):
string = string.decode('utf8', 'ignore')

m = p.match(string, *args, **kwargs)
if m and m.span()[1] == len(string):
return m
Expand Down
2 changes: 1 addition & 1 deletion aliyun/log/logclient_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ def transform_worker(from_client, from_project, from_logstore, shard_id, from_ti

count = removed = processed = failed = 0
for s in iter_data:
events = s.get_flatten_logs_json(time_as_str=True)
events = s.get_flatten_logs_json_auto()

c, r, p, f = _transform_events_to_logstore(runner, events, to_client, to_project, to_logstore)
count += c
Expand Down
29 changes: 26 additions & 3 deletions aliyun/log/pulllog_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from .log_logs_pb2 import LogGroupList
from .log_logs_raw_pb2 import LogGroupListRaw
import six


class PullLogResponse(LogResponse):
Expand All @@ -25,9 +26,11 @@ class PullLogResponse(LogResponse):

def __init__(self, resp, header):
LogResponse.__init__(self, header, resp)
self._is_bytes_type = None
self.next_cursor = Util.convert_unicode_to_str(Util.h_v_t(header, "x-log-cursor"))
self.log_count = int(Util.h_v_t(header, "x-log-count"))
self.loggroup_list = LogGroupList()

self._parse_loggroup_list(resp)
self.loggroup_list_json = None
self.flatten_logs_json = None
Expand Down Expand Up @@ -86,6 +89,7 @@ def _parse_loggroup_list(self, data):
p = LogGroupListRaw()
p.ParseFromString(data)
self.loggroup_list = p
self._is_bytes_type = True
return
except Exception as ex2:
ex_second = ex2
Expand Down Expand Up @@ -113,19 +117,31 @@ def _transfer_to_json(self):
'tags': tags}
self.loggroup_list_json.append(log_items)

DEFAULT_DECODE_LIST = ('utf8', 'gbk8')
@staticmethod
def _b2u(content):
for d in PullLogResponse.DEFAULT_DECODE_LIST:
try:
return content.decode(d, 'ignore')
except UnicodeDecodeError as ex:
continue
return content

@staticmethod
def loggroups_to_flattern_list(loggroup_list, time_as_str=None, decode_bytes=None):
flatten_logs_json = []
for logGroup in loggroup_list.LogGroups:
tags = {}
for tag in logGroup.LogTags:
tags["__tag__:{0}".format(tag.Key)] = tag.Value
tags[u"__tag__:{0}".format(tag.Key)] = tag.Value

for log in logGroup.Logs:
item = {'__time__': str(log.Time) if time_as_str else log.Time, '__topic__': logGroup.Topic, '__source__': logGroup.Source}
item = {u'__time__': six.text_type(log.Time) if time_as_str else log.Time,
u'__topic__': logGroup.Topic,
u'__source__': logGroup.Source}
item.update(tags)
for content in log.Contents:
item[content.Key] = content.Value
item[content.Key] = PullLogResponse._b2u(content.Value) if decode_bytes else content.Value
flatten_logs_json.append(item)
return flatten_logs_json

Expand All @@ -135,3 +151,10 @@ def get_flatten_logs_json(self, time_as_str=None, decode_bytes=None):
decode_bytes=decode_bytes)

return self.flatten_logs_json

def get_flatten_logs_json_auto(self):
if self.flatten_logs_json is None:
self.flatten_logs_json = self.loggroups_to_flattern_list(self.loggroup_list, time_as_str=True,
decode_bytes=self._is_bytes_type)

return self.flatten_logs_json

0 comments on commit b649845

Please sign in to comment.