Skip to content

Commit

Permalink
CLI-ETL: support jmes filter with json function before expansion
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Dec 16, 2018
1 parent b441cb1 commit 0654fd5
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 8 deletions.
56 changes: 51 additions & 5 deletions aliyun/log/etl_core/trans_comp/trans_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import json
from ..etl_util import cached
from collections import Iterable
import jmespath
from jmespath.exceptions import ParseError
from ..exceptions import SettingError

__all__ = ['trans_comp_json']

Expand All @@ -23,7 +26,7 @@ def trans_comp_json(*args, **kwargs):
class json_transformer(trans_comp_base):
DEFAULT_SEP = '.'

def __init__(self, expand=None, prefix=None, suffix=None, jmes=None, output=None):
def __init__(self, expand=None, prefix=None, suffix=None, jmes=None, output=None, jmes_ignore_none=None):
self.expand = expand
if expand is None:
self.expand = not jmes
Expand All @@ -32,7 +35,48 @@ def __init__(self, expand=None, prefix=None, suffix=None, jmes=None, output=None
self.prefix = "" if prefix is None else prefix
self.suffix = "" if suffix is None else suffix
self.output = output
# self.sep = self.DEFAULT_SEP if sep is None else sep
self.jmes_filter = None
self.jmes_ignore_none = True if jmes_ignore_none is None else jmes_ignore_none
if jmes:
try:
self.jmes_filter = jmespath.compile(jmes)
except jmespath.exceptions.ParseError as ex:
raise SettingError(ex=ex, msg="Invalid JMES filter setting", settings=jmes)

def _expand_json(self, message, event, prefix=""):
if isinstance(message, dict):
for k, v in six.iteritems(message):
event["{0}{1}{2}".format(self.prefix, self._n(k), self.suffix)] = self._n(v)

def _process_message(self, message, filter=None):
new_event = {}
if isinstance(message, (six.binary_type, six.text_type)):
try:
message = json.loads(message)
except Exception as ex:
logger.info("json_transformer: fail to load event into json object: {0}, error: {1}".format(message, ex))
return None

if filter:
try:
message = filter.search(message)
if message is None and self.jmes_ignore_none:
logger.info("split_event_transformer: value {0} get null from jmes settings {1}, skip it".
format(message, self.jmes))
return None
except Exception as ex:
logger.info("split_event_transformer: value {0} with invalid jmes settings {1}, skip it".
format(message, self.jmes))
return None

if self.output:
new_event[self.output] = self._n(message)

# if need to expand
if self.expand:
self._expand_json(message, new_event)

return new_event

def extract_json(self, message):
new_event = {}
Expand Down Expand Up @@ -61,13 +105,15 @@ def __call__(self, event, inpt):
continue

if i not in event:
logger.info('trans_comp_lookup: event "{0}" doesn not contain field "{1}"'.format(event, i))
logger.info('trans_comp_lookup: event "{0}" does not contain field "{1}"'.format(event, i))
continue

# get input value
new_event = self.extract_json(event[i])
if event is not new_event:
new_event = self._process_message(event[i], self.jmes_filter)
if new_event and isinstance(new_event, dict):
event.update(new_event)
else:
logger.info('trans_comp_lookup: event "{0}" does not extract value from field "{1}"'.format(event, i))
else:
logger.error("trans_comp_lookup: unknown type of input field {0}".format(inpt))

Expand Down
29 changes: 26 additions & 3 deletions tests/etl_test/test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,17 +596,40 @@ def test_split_filter():


def test_json():
# verify the KV extract pattern match
# extract
d1 = {"data": """{"k1": 100, "k2": 200}"""}
assert t( ("data", JSON) )(d1) == {"data": """{"k1": 100, "k2": 200}""", "k1": "100", "k2": "200"}

# extract - deep
d2 = {"data": """{"k1": 100, "k2": {"k21": 200} }"""}
assert t( ("data", JSON) )(d2) == {"data": """{"k1": 100, "k2": {"k21": 200} }""", "k1": "100", "k2": '{"k21": 200}'}

# extract - array
d3 = {"data": """[1,2,3]"""}
assert t( ("data", JSON) )(d3) == {"data": """[1,2,3]"""}

# jmes filter
d1 = {'i': '1', 'data': _get_file_content('json_data/CVE-2013-0169.json')}
jmes = 'join(`,`,cve.affects.vendor.vendor_data[*].product.product_data[*].product_name[])'
assert t( ("data", JSON(jmes=jmes, output='data')) )(d1) == {"i": "1", 'data': "openssl,openjdk,polarssl"}

# jmes filter - null
d1 = {'i': '1', 'data': '{"f1": [1,2,3]}'}
jmes = 'f1[4]'
#print(t( ("data", JSON(jmes=jmes, output='data')) )(d1))
assert t( ("data", JSON(jmes=jmes, output='data')) )(d1) == {'i': '1', 'data': '{"f1": [1,2,3]}'}
assert t(("data", JSON(jmes=jmes, output='data', jmes_ignore_none=False)))(d1) == {'i': '1', 'data': ''}

# jmes filter - null
d1 = {'i': '1', 'data': '{"f1": "123"}'}
jmes = 'f2'
assert t( ("data", JSON(jmes=jmes, output='data')) )(d1) == {'i': '1', 'data': '{"f1": "123"}'}
assert t(("data", JSON(jmes=jmes, output='data', jmes_ignore_none=False)))(d1) == {'i': '1', 'data': ''}

# jmes filter - real match, empty result
d1 = {'i': '1', 'data': '{"f1": ""}'}
jmes = 'f1'
assert t( ("data", JSON(jmes=jmes, output='data')) )(d1) == {'i': '1', 'data': ''}


# d2 = {"data": """{"k1": 100, "k2": {"k21": 200} }"""}
Expand Down Expand Up @@ -685,8 +708,8 @@ def test_json():
# assert KV_F(r'data2')(d14) == {'k3': 'v3', 'k2': 'v2', 'data1': 'i=c14, k1=v1', 'data3': 'k4=v4', 'data2': 'k2=v2 k3=v3'}


# test_split_filter()
# exit(0)
test_json()
exit(0)

test_condition()
test_regex()
Expand Down

0 comments on commit 0654fd5

Please sign in to comment.