Skip to content

Commit

Permalink
add more options to jmes filter/expand functionality: include, exclud…
Browse files Browse the repository at this point in the history
…e keys, depth, formatting, array_expansion etc.
  • Loading branch information
wjo1212 committed Dec 16, 2018
1 parent b145090 commit 125858c
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 116 deletions.
2 changes: 1 addition & 1 deletion aliyun/log/etl_core/etl_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def re_full_match(pattern, string, *args, **kwargs):
if m and m.span()[1] == len(string):
return m


@cached
def get_re_full_match(pattern, flags=0):
p = re.compile(pattern, flags=flags)
if hasattr(p, 'fullmatch'):
Expand Down
131 changes: 112 additions & 19 deletions aliyun/log/etl_core/trans_comp/trans_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import jmespath
from jmespath.exceptions import ParseError
from ..exceptions import SettingError
import inspect
from ..etl_util import get_re_full_match

__all__ = ['trans_comp_json']

Expand All @@ -25,56 +27,147 @@ def trans_comp_json(*args, **kwargs):

class json_transformer(trans_comp_base):
DEFAULT_SEP = '.'

def __init__(self, expand=None, prefix=None, suffix=None, jmes=None, output=None, jmes_ignore_none=None):
DEFAULT_FMT = "simple"
DEFAULT_DEPTH = 100
DEFAULT_INCLUDE_KEYS = ''
DEFAULT_EXCLUDE_KEYS = ''

FMT_MAP = {
"simple": "{prefix}{current}{suffix}",
"full": "{parent_list_str}{sep}{prefix}{current}{suffix}", # "{sep.join(parent_path)}{sep}{prefix}{current}{suffix}",
"parent": "{parent_list[-1]}{sep}{prefix}{current}{suffix}",
"root": "{parent_list[0]}{sep}{prefix}{current}{suffix}"
# could also be custom formatting string using up to five placehodler: parent_list, current, sep, prefix, suffix
# could also be formatting function accepting the two parameters: parrent_list, current key, value
# Note: the functoin result k, v tuple, if returning None, meaning skip this k-v
}

def __init__(self, jmes=None, jmes_ignore_none=None, output=None,
expand=None, expand_array=False, depth=None, include_keys=None, exclude_keys=None,
fmt=None, sep=None, prefix=None, suffix=None):
"""
:param jmes: jmes filter to select or generate new field
:param jmes_ignore_none: if jmes filter is null, ignore it (default). Or else consider it as "". default is
:param output: put the value parsed from jmes filter to this field
:param expand: If jmes filter is configure, expand the result or not (Default is False in this case), If jmes is not configured, directly expand the field passed or not (Default is True in this case).
:param expand_array: if expand array or just render it. default is True. item in array will be with name index, e.g. [1,2,3] will be considered as {"0": 1, "1": 2, "2": 3}
:param depth: depth to scan, 1 means first layer, default is 100.
:param include_keys: keys to expand and include. regex string. using '|' for multiple ones. default is all.
:param exclude_keys: keys to skip, regex string. using '|' for multiple ones. default is nothing.
:param fmt: during expansion, how to format the key name, there're several types or customized as described in FMT_MAP
:param sep: sep used in formatting during expansion
:param prefix: prefix used in formatting during expansion
:param suffix: suffix used in formatting during expansion
"""
self.expand = expand
if expand is None:
self.expand = not jmes

self.expand_array = True if expand_array is None else expand_array
# self.level = level or 1
self.jmes = jmes or ""
self.prefix = "" if prefix is None else prefix
self.suffix = "" if suffix is None else suffix
self.output = output
self.sep = self.DEFAULT_SEP if sep is None else sep
self.output = output or ""
self.jmes_filter = None
self.jmes_ignore_none = True if jmes_ignore_none is None else jmes_ignore_none
if jmes:
try:
self.jmes_filter = jmespath.compile(jmes)
except jmespath.exceptions.ParseError as ex:
raise SettingError(ex=ex, msg="Invalid JMES filter setting", settings=jmes)
elif self.output:
logger.warning("json_transformer: parameter output '{0}' will be ignored as there's no filter is selected."
.format(output))

self.depth = min( (depth or self.DEFAULT_DEPTH), self.DEFAULT_DEPTH)
self.include_keys = include_keys or self.DEFAULT_INCLUDE_KEYS
self.exclude_keys = exclude_keys or self.DEFAULT_EXCLUDE_KEYS
self.fmt = fmt or self.DEFAULT_FMT
self.include_match = get_re_full_match(self.include_keys)
self.exclude_match = get_re_full_match(self.exclude_keys)

@staticmethod
def format_add_kv(event, fmt, current, value, parent_list, sep, prefix, suffix):
ret = None
if isinstance(fmt, (six.text_type, six.binary_type)):
fmt = json_transformer.FMT_MAP.get(fmt.strip().lower(), fmt)
try:
ret = fmt.format(parent_list_str=sep.join(parent_list), parent_list=parent_list, current=current, sep=sep, prefix=prefix, suffix=suffix), \
json_transformer._n(value)
except ValueError as ex:
logger.info("json_transformer")
elif isinstance(fmt, inspect.isfunction):
ret = fmt(parent_list, current, value)

if ret and len(ret) == 2:
k, v = ret
event[json_transformer._n(k)] = json_transformer._n(v)
else:
logger.info("json_transformer: unexpected format result: {0}, fmt: '{1}', k: '{2}', v: '{3}', skip it"
.format(ret, fmt, current, value))

def _expand_json(self, event, key, value, parent_list, depth, sep, prefix, suffix):
# check if need to skip it
if (self.include_keys and not self.include_match(key)) or (self.exclude_keys and self.exclude_match(key)):
logger.info("json_transformer: 'key' {0} is not in include keys '{1}' or in exclude keys '{2}', skip it."
.format(key, self.include_keys, self.exclude_keys))
return

# check if need to format it directly
if len(parent_list) > depth \
or (not isinstance(value, (list, tuple, dict))) \
or (isinstance(value, (list, tuple)) and not self.expand_array):
# 1. depth hit, 2. basic type, 3. array but not expand
logger.info("json_transformer: hit stop parsing, key: '{0}', value: '{1}', parent: '{2}', depth: '{3}'"
.format(key, value, parent_list, depth))
self.format_add_kv(event, self.fmt, self._n(key), self._n(value), parent_list, sep, prefix, suffix)
return None

# convert array to dict
if isinstance(value, (list, tuple)):
value = dict((str(i), v) for i, v in enumerate(value))

if isinstance(value, dict):
for k, v in six.iteritems(value):
if isinstance(v, (dict, tuple, list)):
# recursively parse it
self._expand_json(event, k, v, parent_list + (k, ), depth, sep, prefix, suffix)
else:
self.format_add_kv(event, self.fmt, self._n(k), self._n(v), parent_list, sep, prefix, suffix)

def _expand_json(self, message, event, prefix=""):
if isinstance(message, dict):
for k, v in six.iteritems(message):
event["{0}{1}{2}".format(self.prefix, self._n(k), self.suffix)] = self._n(v)
else:
logger.info("json_transformer: skip unsupported message '{0}' of type '{1}' when expanding"
.format(value, type(value)))

def _process_message(self, message, filter=None):
def _process_message(self, key, value):
new_event = {}
if isinstance(message, (six.binary_type, six.text_type)):
if isinstance(value, (six.binary_type, six.text_type)):
try:
message = json.loads(message)
value = json.loads(value)
except Exception as ex:
logger.info("json_transformer: fail to load event into json object: {0}, error: {1}".format(message, ex))
logger.info("json_transformer: fail to load event into json object: {0}, error: {1}".format(value, ex))
return None

if filter:
if self.jmes_filter:
try:
message = filter.search(message)
if message is None and self.jmes_ignore_none:
value = self.jmes_filter.search(value)
if value is None and self.jmes_ignore_none:
logger.info("split_event_transformer: value {0} get null from jmes settings {1}, skip it".
format(message, self.jmes))
format(value, self.jmes))
return None
except Exception as ex:
logger.info("split_event_transformer: value {0} with invalid jmes settings {1}, skip it".
format(message, self.jmes))
format(value, self.jmes))
return None

if self.output:
new_event[self.output] = self._n(message)
new_event[self.output] = self._n(value)

# if need to expand
if self.expand:
self._expand_json(message, new_event)
self._expand_json(new_event, key, value, (key, ), self.depth, self.sep, self.prefix, self.suffix)

return new_event

Expand Down Expand Up @@ -109,7 +202,7 @@ def __call__(self, event, inpt):
continue

# get input value
new_event = self._process_message(event[i], self.jmes_filter)
new_event = self._process_message(i, event[i])
if new_event and isinstance(new_event, dict):
event.update(new_event)
else:
Expand Down
8 changes: 8 additions & 0 deletions tests/etl_test/json_data/simple_data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"cve": {
"CVE_data_meta": {
"ASSIGNER": "cve@mitre.org",
"ID": "CVE-2013-0169"
}
}
}
148 changes: 52 additions & 96 deletions tests/etl_test/test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,6 @@ def test_split_filter():
d1 = {'i': '1', 'data': _get_file_content('json_data/CVE-2013-0169.json')}
# jmes filter output as list
jmes = 'cve.affects.vendor.vendor_data[*].product.product_data[*].product_name[]'
#print( t( ("data", SPLIT(jmes=jmes, output='data')) )(d1))
assert t( ("data", SPLIT(jmes=jmes, output='data')) )(d1) == [{"i": "1", 'data': "openssl"}, {"i": "1", 'data': "openjdk"}, {"i": "1", 'data': "polarssl"}]

d1 = {'i': '1', 'data': _get_file_content('json_data/CVE-2013-0169.json')}
Expand All @@ -596,120 +595,77 @@ def test_split_filter():


def test_json():
# extract
d1 = {"data": """{"k1": 100, "k2": 200}"""}
assert t( ("data", JSON) )(d1) == {"data": """{"k1": 100, "k2": 200}""", "k1": "100", "k2": "200"}

# extract - deep
d2 = {"data": """{"k1": 100, "k2": {"k21": 200} }"""}
assert t( ("data", JSON) )(d2) == {"data": """{"k1": 100, "k2": {"k21": 200} }""", "k1": "100", "k2": '{"k21": 200}'}

# extract - array
d3 = {"data": """[1,2,3]"""}
assert t( ("data", JSON) )(d3) == {"data": """[1,2,3]"""}

# jmes filter
d1 = {'i': '1', 'data': _get_file_content('json_data/CVE-2013-0169.json')}
jmes = 'join(`,`,cve.affects.vendor.vendor_data[*].product.product_data[*].product_name[])'
assert t( ("data", JSON(jmes=jmes, output='data')) )(d1) == {"i": "1", 'data': "openssl,openjdk,polarssl"}

# jmes filter - null
# jmes filter - real match, empty result
d1 = {'i': '1', 'data': '{"f1": ""}'}
jmes = 'f1'
assert t( ("data", JSON(jmes=jmes, output='data')) )(d1) == {'i': '1', 'data': ''}

# jmes filter 1 option: jmes_ignore_none
d1 = {'i': '1', 'data': '{"f1": [1,2,3]}'}
jmes = 'f1[4]'
#print(t( ("data", JSON(jmes=jmes, output='data')) )(d1))
assert t( ("data", JSON(jmes=jmes, output='data')) )(d1) == {'i': '1', 'data': '{"f1": [1,2,3]}'}
assert t(("data", JSON(jmes=jmes, output='data', jmes_ignore_none=False)))(d1) == {'i': '1', 'data': ''}

# jmes filter - null
# jmes filter 2 option: jmes_ignore_none
d1 = {'i': '1', 'data': '{"f1": "123"}'}
jmes = 'f2'
assert t( ("data", JSON(jmes=jmes, output='data')) )(d1) == {'i': '1', 'data': '{"f1": "123"}'}
assert t(("data", JSON(jmes=jmes, output='data', jmes_ignore_none=False)))(d1) == {'i': '1', 'data': ''}

# jmes filter - real match, empty result
d1 = {'i': '1', 'data': '{"f1": ""}'}
jmes = 'f1'
assert t( ("data", JSON(jmes=jmes, output='data')) )(d1) == {'i': '1', 'data': ''}
# jmes filter - output
d1 = {"data": """{"k1": 100, "k2": 200}"""}
assert t( ("data", JSON(jmes='k2', output='k3')) )(d1) == {"data": """{"k1": 100, "k2": 200}""", 'k3': '200'}

# extract
d1 = {"data": """{"k1": 100, "k2": 200}"""}
assert t( ("data", JSON) )(d1) == {"data": """{"k1": 100, "k2": 200}""", "k1": "100", "k2": "200"}

# d2 = {"data": """{"k1": 100, "k2": {"k21": 200} }"""}
# print(t( ("data", JSON) )(d2))
# assert t( ("data", JSON(level=2)) )(d2) == {"data": """{"k1": 100, "k2": {"k21": 200} }""", "k1": "100", "k2": '{"k21": 200}'}

# d2 = {"data": 'i=c2, k1=" v 1 ", k2="v 2" k3="~!@#=`;.>"'}
# assert t(("data", KV))(d2) == {'i': 'c2', 'k2': 'v 2', 'k1': 'v 1', 'k3': '~!@#=`;.>', 'data': 'i=c2, k1=" v 1 ", k2="v 2" k3="~!@#=`;.>"'}
#
# # multi-bytes check
# if six.PY2:
# d3 = {"data": u'i=c3, k1=你好 k2=他们'.encode('utf8')}
# assert t(("data", KV))(d3) == {'i': 'c3', 'k2': u'他们'.encode('utf8'), 'k1': u'你好'.encode('utf8'), "data": u'i=c3, k1=你好 k2=他们'.encode('utf8')}
#
# d4 = {"data": u'i=c4, 姓名=小明 年龄=中文 '.encode('utf8')}
# assert t(("data", KV))(d4) == {'i': 'c4', u'姓名'.encode('utf8'): u'小明'.encode('utf8'), u'年龄'.encode('utf8'): u'中文'.encode('utf8'), "data": u'i=c4, 姓名=小明 年龄=中文 '.encode('utf8')}
#
# d5 = {"data": u'i=c5, 姓名="小明" 年龄="中文" '.encode('utf8')}
# assert t(("data", KV))(d5) == {'i': 'c5', u'姓名'.encode('utf8'): u'小明'.encode('utf8'), u'年龄'.encode('utf8'): u'中文'.encode('utf8'), "data": u'i=c5, 姓名="小明" 年龄="中文" '.encode('utf8')}
#
# d6 = {"data": u'i=c6, 姓名=小明 年龄=中文'}
# assert t(("data", KV))(d6) == {'i': 'c6', u'姓名'.encode('utf8'): u'小明'.encode('utf8'), u'年龄'.encode('utf8'): u'中文'.encode('utf8'), "data": u'i=c6, 姓名=小明 年龄=中文'}
#
# d7 = {"data": u'i=c7, 姓名="小明" 年龄=中文 '}
# assert t(("data", KV))(d7) == {'i': 'c7', u'姓名'.encode('utf8'): u'小明'.encode('utf8'), u'年龄'.encode('utf8'): u'中文'.encode('utf8'), "data": u'i=c7, 姓名="小明" 年龄=中文 '}
# else:
# d3 = {"data": u'i=c3, k1=你好 k2=他们'.encode('utf8')}
# assert t(("data", KV))(d3) == {'i': 'c3', 'k2': u'他们', 'k1': u'你好', "data": u'i=c3, k1=你好 k2=他们'.encode('utf8')}
#
# d4 = {"data": u'i=c4, 姓名=小明 年龄=中文 '.encode('utf8')}
# assert t(("data", KV))(d4) == {'i': 'c4', u'姓名': u'小明', u'年龄': u'中文', "data": u'i=c4, 姓名=小明 年龄=中文 '.encode('utf8')}
#
# d5 = {"data": u'i=c5, 姓名="小明" 年龄="中文" '.encode('utf8')}
# assert t(("data", KV))(d5) == {'i': 'c5', u'姓名': u'小明', u'年龄': u'中文', "data": u'i=c5, 姓名="小明" 年龄="中文" '.encode('utf8')}
#
# d6 = {"data": u'i=c6, 姓名=小明 年龄=中文'}
# assert t(("data", KV))(d6) == {'i': 'c6', u'姓名': u'小明', u'年龄': u'中文', "data": u'i=c6, 姓名=小明 年龄=中文'}
#
# d7 = {"data": u'i=c7, 姓名="小明" 年龄=中文 '}
# assert t(("data", KV))(d7) == {'i': 'c7', u'姓名': u'小明', u'年龄': u'中文', "data": u'i=c7, 姓名="小明" 年龄=中文 '}
#
#
# # new line in value
# d8 = {"data": """i=c8, k1="hello
# world" k2="good
# morning"
# """}
# assert t(("data", KV))(d8) == {'i': 'c8', 'k2': 'good\n morning', 'k1': 'hello\n world', 'data': 'i=c8, k1="hello\n world" k2="good\n morning"\n '}
#
# ################
# ## Options
#
# # sep-regex
# d9 = {"data": "i=c9 k1:v1, k2=v2"}
# assert t(("data", KV(sep='(?::|=)')))(d9) == {'k2': 'v2', 'k1': 'v1', 'i': 'c9', 'data': 'i=c9 k1:v1, k2=v2'}
#
# # quote
# d10 = {"data": "i=c10 a='k1=k2;k2=k3'"}
# assert t(("data", KV(quote="'")))(d10) == {'i': 'c10', 'a': 'k1=k2;k2=k3', 'data': "i=c10 a='k1=k2;k2=k3'"}
#
# # prefix/suffix
# d11 = {"data": "i=c11, k1=v1,k2=v2 k3=v3"}
# assert t( ("data", KV(prefix="d_", suffix="_e")) )(d11) == {'d_i_e': 'c11', 'd_k3_e': 'v3', 'd_k2_e': 'v2', 'data': 'i=c11, k1=v1,k2=v2 k3=v3', 'd_k1_e': 'v1'}
#
# # multiple inputs
# d12 = {"data1": "i=c12, k1=v1", "data2": "k2=v2 k3=v3", "data3": "k4=v4"}
# assert t( (["data1", "data2"], KV) )(d12) == {'k3': 'v3', 'k2': 'v2', 'k1': 'v1', 'i': 'c12', 'data1': 'i=c12, k1=v1', 'data2': 'k2=v2 k3=v3', "data3": "k4=v4"}
#
# #############
# # KV_F
#
# d13 = {"data1": "i=c13, k1=v1", "data2": "k2=v2 k3=v3", "data3": "k4=v4"}
# assert KV_F(["data1", "data2"])(d13) == {'k3': 'v3', 'k2': 'v2', 'k1': 'v1', 'i': 'c13', 'data1': 'i=c13, k1=v1', 'data3': 'k4=v4', 'data2': 'k2=v2 k3=v3'}
#
# d14 = {"data1": "i=c14, k1=v1", "data2": "k2=v2 k3=v3", "data3": "k4=v4"}
# assert KV_F(r'data2')(d14) == {'k3': 'v3', 'k2': 'v2', 'data1': 'i=c14, k1=v1', 'data3': 'k4=v4', 'data2': 'k2=v2 k3=v3'}
# extract - deep - all depth
d2 = {"data": """{"k1": 100, "k2": {"k3": 200, "k4": {"k5": 300} } }"""}
assert t(("data", JSON))(d2) == {'data': '{"k1": 100, "k2": {"k3": 200, "k4": {"k5": 300} } }', 'k1': '100', 'k3': '200', 'k5': '300'}

# extract - deep - level-1
d2 = {"data": """{"k1": 100, "k2": {"k21": 200} }"""}
assert t( ("data", JSON(depth=1)) )(d2) == {"data": """{"k1": 100, "k2": {"k21": 200} }""", "k1": "100", "k2": '{"k21": 200}'}

# extract - deep - level-2
d2 = {"data": """{"k1": 100, "k2": {"k3": 200, "k4": {"k5": 300} } }"""}
assert t(("data", JSON(depth=2)))(d2) == {'data': '{"k1": 100, "k2": {"k3": 200, "k4": {"k5": 300} } }', 'k1': '100', 'k3': '200', 'k4': '{"k5": 300}'}

# extract - deep - format - full
d2 = {"data": """{"k1": 100, "k2": {"k3": 200, "k4": {"k5": 300} } }"""}
assert t(("data", JSON(fmt='full')))(d2) == {'data': '{"k1": 100, "k2": {"k3": 200, "k4": {"k5": 300} } }', 'data.k1': '100', 'data.k2.k3': '200', 'data.k2.k4.k5': '300'}


# # extract - deep - format - parent
# d2 = {"data": """{"k1": 100, "k2": {"k3": 200, "k4": {"k5": 300} } }"""}
# print(t(("data", JSON(fmt='parent')))(d2))
# assert t(("data", JSON(fmt='parent')))(d2) == {'data': '{"k1": 100, "k2": {"k3": 200, "k4": {"k5": 300} } }', 'data.k1': '100', 'data.k2.k3': '200', 'data.k2.k4.k5': '300'}


# extract - array
d3 = {"data": """[1,2,3]"""}
assert t( ("data", JSON(expand_array=False)) )(d3) == {"data": """[1, 2, 3]"""}

# # extract - array - expand
# d3 = {"data": """[1,2,3]"""}
# print( t( ("data", JSON) )(d3) )
# assert t( ("data", JSON) )(d3) == {"data": """[1, 2, 3]"""}

# jmes - expand
d1 = {'i': '1', 'data': _get_file_content('json_data/simple_data.json')}
jmes = 'cve.CVE_data_meta'
assert t( ("data", JSON(jmes=jmes, output='data', expand=True)) )(d1) == {'i': '1', 'data': '{"ASSIGNER": "cve@mitre.org", "ID": "CVE-2013-0169"}', 'ASSIGNER': 'cve@mitre.org', 'ID': 'CVE-2013-0169'}

# expand prefix/suffix
d1 = {"data": """{"k1": 100, "k2": 200}"""}
assert t( ("data", JSON(prefix="data_", suffix="_end")) )(d1) == {'data': '{"k1": 100, "k2": 200}', 'data_k1_end': '100', 'data_k2_end': '200'}

test_json()
exit(0)

test_condition()
test_regex()
Expand Down

0 comments on commit 125858c

Please sign in to comment.