Skip to content

Commit

Permalink
add fully support for KV style and test case
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Dec 4, 2018
1 parent 8299399 commit b0e4d0b
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 17 deletions.
17 changes: 17 additions & 0 deletions aliyun/log/etl_core/etl_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from functools import wraps


def cached(fn):
@wraps(fn)
def _wrapped(*args, **kwargs):
sig = str(args) + str(kwargs)
if not hasattr(fn, "__CACHE__") or sig not in fn.__CACHE__:
setattr(fn, "__CACHE__", {})

ret = fn(*args, **kwargs)
fn.__CACHE__[sig] = ret
return ret
else:
return fn.__CACHE__[sig]

return _wrapped
4 changes: 3 additions & 1 deletion aliyun/log/etl_core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
'RENAME_FIELDS_.*': rename_fields,
'ALIAS_.*': rename_fields,
'DISPATCH_EVENT_.*': dispatch_event,
'TRANSFORM_EVENT_.*': transform_event
'TRANSFORM_EVENT_.*': transform_event,
'KV_FIELDS_.*': extract_kv_fields
}

__all__ = ['KEEP_EVENT_',
'DROP_EVENT_',
'KEEP_FIELDS_',
'DROP_FIELDS_',
'RENAME_FIELDS_',
'KV_FIELDS_',
'ALIAS_',
'DISPATCH_EVENT_',
'TRANSFORM_EVENT_']
Expand Down
1 change: 1 addition & 0 deletions aliyun/log/etl_core/trans_comp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .trans_regex import *
from .trans_lookup import *
from .trans_csv import *
from .trans_kv import *

__all__ = ['LOOKUP', 'REGEX', 'CSV', 'TSV', 'JSON', 'KV', 'V']

Expand Down
98 changes: 98 additions & 0 deletions aliyun/log/etl_core/trans_comp/trans_kv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from .trans_base import trans_comp_base
import six
import re
import logging
import json
from ..etl_util import cached
from collections import Iterable

__all__ = ['trans_comp_kv']

logger = logging.getLogger(__name__)


def trans_comp_kv(*args, **kwargs):
if (args and isinstance(args[0], dict)) or ('event' in kwargs and isinstance(kwargs['event'], dict)):
# event case
kv = kv_transformer()
return kv(*args, **kwargs)
else:
return kv_transformer(*args, **kwargs)


class kv_transformer(trans_comp_base):
DEFAULT_SEP = '='
DEFAULT_QUOTE = '"'

@staticmethod
@cached
def _get_kv_ptn(sep, quote):
p1 = u'(?!{0})([\u4e00-\u9fa5\u0800-\u4e00\\w\\.\\-]+)\\s*{0}\\s*([\u4e00-\u9fa5\u0800-\u4e00\\w\\.\\-]+)'
p2 = u'(?!{0})([\u4e00-\u9fa5\u0800-\u4e00\\w\\.\\-]+)\\s*{0}\\s*{1}\s*([^{1}]+?)\s*{1}'
ps = u'|'.join([p1, p2]).format(sep, quote)

logger.info(u"trans_comp_kv: get ptn: {0}".format(ps))
return re.compile(ps)

@staticmethod
def _n(v):
if v is None:
return ""

if isinstance(v, (dict, list)):
try:
v = json.dumps(v)
except Exception:
pass
elif six.PY2 and isinstance(v, six.text_type):
v = v.encode('utf8', "ignore")
elif six.PY3 and isinstance(v, six.binary_type):
v = v.decode('utf8', "ignore")

return str(v)

def __init__(self, prefix=None, suffix=None, sep=None, quote=None):
self.prefix = "" if prefix is None else prefix
self.suffix = "" if suffix is None else suffix

sep = self.DEFAULT_SEP if sep is None else sep
quote = self.DEFAULT_QUOTE if quote is None else quote
self.ptn = self._get_kv_ptn(sep, quote)

def extract_kv(self, value):
if isinstance(value, six.binary_type):
value = value.decode('utf8', 'ignore')

r = self.ptn.findall(value)

new_event = {}
for k1, v1, k2, v2 in r:
if k1:
new_event["{0}{1}{2}".format(self.prefix, self._n(k1), self.suffix)] = self._n(v1)
elif k2:
new_event["{0}{1}{2}".format(self.prefix, self._n(k2), self.suffix)] = self._n(v2)

return new_event

def __call__(self, event, inpt):
# simple dict mode
if isinstance(inpt, (six.binary_type, six.text_type)):
inpt = [inpt]

if isinstance(inpt, Iterable):
for i in inpt:
if not isinstance(i, (six.binary_type, six.text_type)):
logger.error('trans_comp_lookup: type of input field "{0}" is unknown'.format(i))
continue

if i not in event:
logger.info('trans_comp_lookup: event "{0}" doesn not contain field "{1}"'.format(event, i))
continue

# get input value
new_event = self.extract_kv(event[i])
event.update(new_event)
else:
logger.error("trans_comp_lookup: unknown type of input field {0}".format(inpt))

return event
8 changes: 1 addition & 7 deletions aliyun/log/etl_core/trans_comp/trans_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

logger = logging.getLogger(__name__)

__all__ = ['trans_comp_lookup', 'trans_comp_json', 'trans_comp_kv']
__all__ = ['trans_comp_lookup', 'trans_comp_json']


class LookupError(SettingError):
Expand Down Expand Up @@ -205,9 +205,3 @@ def __call__(self, event, inpt):
return event


class trans_comp_kv(trans_comp_base):
def __init__(self, config):
self.config = config

def __call__(self, event, inpt):
return event
1 change: 0 additions & 1 deletion aliyun/log/etl_core/trans_comp/trans_util.py

This file was deleted.

21 changes: 19 additions & 2 deletions aliyun/log/etl_core/transform/transform_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

from .transform_base import transform_base
from ..exceptions import SettingError
from ..trans_comp import KV

__all__ = ["simple_drop", "simple_keep", 'drop_fields', "keep_fields", "rename_fields",
'DROP', 'KEEP', 'ALIAS', 'RENAME', 'DROP_F', 'KEEP_F']
__all__ = ["simple_drop", "simple_keep", 'drop_fields', "keep_fields", "rename_fields", 'extract_kv_fields',
'DROP', 'KEEP', 'ALIAS', 'RENAME', 'DROP_F', 'KEEP_F', 'KV_F']


class simple_drop(transform_base):
Expand Down Expand Up @@ -59,9 +60,25 @@ def __call__(self, event):
return dict((self.new_name(k), v) for k, v in six.iteritems(event))


class extract_kv_fields(transform_base):
def __init__(self, config):
if isinstance(config, (six.text_type, six.binary_type)):
self.check = re.compile(config).match
elif isinstance(config, list): # string list
checks = [re.compile(c).match for c in config]
self.check = lambda k: any(ck(k) for ck in checks)
else:
raise SettingError(None, "extract_kv setting {0} is not supported".format(config))

def __call__(self, event):
fields_list = [k for k, v in six.iteritems(event) if self.check(k)]
return KV(event, fields_list)


DROP = simple_drop()
KEEP = simple_keep()

ALIAS, RENAME = rename_fields, rename_fields
DROP_F = drop_fields
KEEP_F = keep_fields
KV_F = extract_kv_fields
2 changes: 1 addition & 1 deletion aliyun/log/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '0.6.36'
__version__ = '0.6.37'
USER_AGENT = 'log-python-sdk-v-' + __version__
LOGGING_HANDLER_USER_AGENT = 'logging-handler, ' + USER_AGENT
ES_MIGRATION_USER_AGENT = 'es-migration, ' + USER_AGENT
Expand Down
3 changes: 2 additions & 1 deletion tests/etl_test/data4.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
{"csv_field": "v1, v2, \"v3,v3\"", "dsv_field": "|d1#d1|#d2#d3"}
{"data": "123", "f1":"a", "f2":"b"}
{"data": "123", "f1":"A", "f2":"C"}
{"data": "123", "f1":"x", "f2":"y"}
{"data": "123", "f1":"x", "f2":"y"}
{"kv_data1": "k1=v1 k2=\"v2, v2\"", "kv_data2": "k4=v4 k5=v5"}
4 changes: 3 additions & 1 deletion tests/etl_test/data4_test1.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@
( ANY, ("f1",LOOKUP({'a': "a_new", '*': "unknown"}, "f1_new")) )
]

DROP_FIELDS_origin = ["csv_field", 'dsv_field', 'data', 'f1', 'f2']
KV_FIELDS_data = r'kv_data\d+'

DROP_FIELDS_origin = ["csv_field", 'dsv_field', 'data', 'f1', 'f2', r'kv_data\d+']
3 changes: 2 additions & 1 deletion tests/etl_test/data4_test1_result.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
{"f_v3": "v3,v3", "f_v2": "v2", "f_v1": "v1", "f_d1": "d1#d1", "f_d3": "d3", "f_d2": "d2"}
{"d2": "2", "d1": "1"}
{"d2": "6", "d1": "5"}
{"d2": "8", "d1": "7"}
{"d2": "8", "d1": "7"}
{"k2": "v2, v2", "k1": "v1", "k5": "v5", "k4": "v4"}
65 changes: 63 additions & 2 deletions tests/etl_test/test_case.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#encoding: utf8

from aliyun.log.etl_core import *
from aliyun.log.etl_core.config_parser import ConfigParser
Expand Down Expand Up @@ -245,7 +246,7 @@ def verify_case(config, data, result):
if ret is None: # removed line
removed_lines += 1
if ret:
#print(json.dumps(ret))
# print(json.dumps(ret))
r = json.loads(results[i - removed_lines])
assert ret == r, Exception(i, line, ret, (i-removed_lines), r)

Expand Down Expand Up @@ -441,13 +442,73 @@ def test_lookup_mapping():
assert t( (["c1", "c2"], LOOKUP(csv_path, ["d1", "d2"]) ) )({'data': '123', 'c1': 'c', 'c2': 'v'}) == {'data': '123', 'c1': 'c', 'c2': 'v', 'd1': '0', 'd2': '0'}


def test_kv():
# verify the KV extract pattern match
d1 = {"data": "i=c1, k1=v1,k2=v2 k3=v3"}
assert t( ("data", KV) )(d1) == {'i': 'c1', 'k2': 'v2', 'k1': 'v1', 'k3': 'v3', 'data': 'i=c1, k1=v1,k2=v2 k3=v3'}

d2 = {"data": 'i=c2, k1=" v 1 ", k2="v 2" k3="~!@#=`;.>"'}
assert t(("data", KV))(d2) == {'i': 'c2', 'k2': 'v 2', 'k1': 'v 1', 'k3': '~!@#=`;.>', 'data': 'i=c2, k1=" v 1 ", k2="v 2" k3="~!@#=`;.>"'}

# multi-bytes check
d3 = {"data": u'i=c3, k1=你好 k2=他们'.encode('utf8')}
assert t(("data", KV))(d3) == {'i': 'c3', 'k2': u'他们'.encode('utf8'), 'k1': u'你好'.encode('utf8'), "data": u'i=c3, k1=你好 k2=他们'.encode('utf8')}

d4 = {"data": u'i=c4, 姓名=小明 年龄=中文 '.encode('utf8')}
assert t(("data", KV))(d4) == {'i': 'c4', u'姓名'.encode('utf8'): u'小明'.encode('utf8'), u'年龄'.encode('utf8'): u'中文'.encode('utf8'), "data": u'i=c4, 姓名=小明 年龄=中文 '.encode('utf8')}

d5 = {"data": u'i=c5, 姓名="小明" 年龄="中文" '.encode('utf8')}
assert t(("data", KV))(d5) == {'i': 'c5', u'姓名'.encode('utf8'): u'小明'.encode('utf8'), u'年龄'.encode('utf8'): u'中文'.encode('utf8'), "data": u'i=c5, 姓名="小明" 年龄="中文" '.encode('utf8')}

d6 = {"data": u'i=c6, 姓名=小明 年龄=中文'}
assert t(("data", KV))(d6) == {'i': 'c6', u'姓名'.encode('utf8'): u'小明'.encode('utf8'), u'年龄'.encode('utf8'): u'中文'.encode('utf8'), "data": u'i=c6, 姓名=小明 年龄=中文'}

d7 = {"data": u'i=c7, 姓名="小明" 年龄=中文 '}
assert t(("data", KV))(d7) == {'i': 'c7', u'姓名'.encode('utf8'): u'小明'.encode('utf8'), u'年龄'.encode('utf8'): u'中文'.encode('utf8'), "data": u'i=c7, 姓名="小明" 年龄=中文 '}

# new line in value
d8 = {"data": """i=c8, k1="hello
world" k2="good
morning"
"""}
assert t(("data", KV))(d8) == {'i': 'c8', 'k2': 'good\n morning', 'k1': 'hello\n world', 'data': 'i=c8, k1="hello\n world" k2="good\n morning"\n '}

################
## Options

# sep-regex
d9 = {"data": "i=c9 k1:v1, k2=v2"}
assert t(("data", KV(sep='(?::|=)')))(d9) == {'k2': 'v2', 'k1': 'v1', 'i': 'c9', 'data': 'i=c9 k1:v1, k2=v2'}

# quote
d10 = {"data": "i=c10 a='k1=k2;k2=k3'"}
assert t(("data", KV(quote="'")))(d10) == {'i': 'c10', 'a': 'k1=k2;k2=k3', 'data': "i=c10 a='k1=k2;k2=k3'"}

# prefix/suffix
d11 = {"data": "i=c11, k1=v1,k2=v2 k3=v3"}
assert t( ("data", KV(prefix="d_", suffix="_e")) )(d11) == {'d_i_e': 'c11', 'd_k3_e': 'v3', 'd_k2_e': 'v2', 'data': 'i=c11, k1=v1,k2=v2 k3=v3', 'd_k1_e': 'v1'}

# multiple inputs
d12 = {"data1": "i=c12, k1=v1", "data2": "k2=v2 k3=v3", "data3": "k4=v4"}
assert t( (["data1", "data2"], KV) )(d12) == {'k3': 'v3', 'k2': 'v2', 'k1': 'v1', 'i': 'c12', 'data1': 'i=c12, k1=v1', 'data2': 'k2=v2 k3=v3', "data3": "k4=v4"}

#############
# KV_F

d13 = {"data1": "i=c13, k1=v1", "data2": "k2=v2 k3=v3", "data3": "k4=v4"}
assert KV_F(["data1", "data2"])(d13) == {'k3': 'v3', 'k2': 'v2', 'k1': 'v1', 'i': 'c13', 'data1': 'i=c13, k1=v1', 'data3': 'k4=v4', 'data2': 'k2=v2 k3=v3'}

d14 = {"data1": "i=c14, k1=v1", "data2": "k2=v2 k3=v3", "data3": "k4=v4"}
assert KV_F(r'data2')(d14) == {'k3': 'v3', 'k2': 'v2', 'data1': 'i=c14, k1=v1', 'data3': 'k4=v4', 'data2': 'k2=v2 k3=v3'}


test_condition()
test_regex()
test_csv()
test_lookup_dict()
test_lookup_load_csv()
test_lookup_mapping()

test_kv()
test_dispatch_transform()
test_meta()
test_parse()
Expand Down

0 comments on commit b0e4d0b

Please sign in to comment.