Skip to content

Commit

Permalink
add json split with jmes filter and test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Dec 14, 2018
1 parent 5bc536c commit b441cb1
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 15 deletions.
68 changes: 56 additions & 12 deletions aliyun/log/etl_core/trans_comp/trans_mv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import logging
import copy
import json
import jmespath
from jmespath.exceptions import ParseError
from ..exceptions import SettingError

__all__ = ['trans_comp_split_event']

Expand All @@ -23,11 +26,19 @@ class split_event_transformer(trans_comp_base):
DEFAULT_SEP = ','
DEFAULT_QUOTE = '"'

def __init__(self, sep=None, quote=None, lstrip=None, output_field=None):
def __init__(self, sep=None, quote=None, lstrip=None, output=None, jmes=None):
self.sep = sep or self.DEFAULT_SEP
self.lstrip = True if lstrip is None else lstrip
self.quote = quote or self.DEFAULT_QUOTE
self.output_field = output_field
self.output_field = output

self.jmes = jmes or ""
self.jmes_filter = None
if jmes:
try:
self.jmes_filter = jmespath.compile(jmes)
except jmespath.exceptions.ParseError as ex:
raise SettingError(ex=ex, msg="Invalid JMES filter setting", settings=jmes)

def _process_list(self, event, field, lst):
if not lst:
Expand All @@ -41,29 +52,62 @@ def _process_list(self, event, field, lst):
result.append(e)
return result
else:
event[field] = self._n(lst)
event[field] = self._n(lst[0])
return event

def _parse_list(self, v):
def _parse_list(self, v, filter=None):
# process json first (via filter or directly list)
try:
ret = json.loads(v)
if isinstance(ret, (list)):

# list and not jmes filter
if isinstance(ret, (list,)) and not self.jmes:
return ret

# json object and jmes is configured
if filter:
ret = filter.search(ret)

if isinstance(ret, (list,)):
# result is list as expected
return ret
elif isinstance(ret, (six.text_type, six.binary_type) ):
# result is just a string. then re-do the process and no jmes is passed this time
return self._parse_list(ret)
else:
logger.info('split_event_transformer: get unknown type of result with value "{0}" and jmes filter "{1}", skip it'.
format(v, self.jmes))
return None

# else: go to next step: parse it as CSV

except Exception as ex:
logger.debug("split_event_transformer: value {0} is not json, try to use csv".format(v))
# failed at json load or jmes filter. if jmes is configured, then it's an invalid event
if filter:
logger.info("split_event_transformer: value {0} is json or not invaid jmes settings {1}, skip it".
format(v, self.jmes))
return None
else:
logger.debug("split_event_transformer: value {0} is not json, try to use csv".format(v))

# continue to parse it as csv
if isinstance(v, (six.text_type, six.binary_type) ):
result = list(csv.reader([v], skipinitialspace=self.lstrip, delimiter=self.sep, quotechar=self.quote))[0]
return result
else:
logger.info("split_event_transformer: cannot extract list from value: {0}".format(v))
return None

result = list(csv.reader([v], skipinitialspace=self.lstrip, delimiter=self.sep, quotechar=self.quote))[0]
return result

def __call__(self, event, inpt):
# overwrite input field
if not self.output_field:
self.output_field = inpt

# csv mode
if isinstance(inpt, (six.binary_type, six.text_type)):
ret = self._parse_list(event[inpt])
return self._process_list(event, inpt, ret)
if inpt in event:
# json/csv mode
ret = self._parse_list(event[inpt], self.jmes_filter)
return self._process_list(event, self.output_field, ret)
else:
logger.error("trans_comp_lookup: unknown type of input field {0}".format(inpt))

Expand Down
100 changes: 100 additions & 0 deletions tests/etl_test/json_data/CVE-2013-0169.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{
"cve": {
"CVE_data_meta": {
"ASSIGNER": "cve@mitre.org",
"ID": "CVE-2013-0169"
},
"affects": {
"vendor": {
"vendor_data": [
{
"product": {
"product_data": [
{
"product_name": "openssl",
"version": {
"version_data": [
{
"version_value": "*"
},
{
"version_value": "0.9.8"
},
{
"version_value": "0.9.8a"
},
{
"version_value": "0.9.8b"
},
{
"version_value": "0.9.8c"
},
{
"version_value": "0.9.8d"
},
{
"version_value": "0.9.8f"
},
{
"version_value": "0.9.8g"
}
]
}
}
]
},
"vendor_name": "openssl"
},
{
"product": {
"product_data": [
{
"product_name": "openjdk",
"version": {
"version_data": [
{
"version_value": "-"
},
{
"version_value": "1.6.0"
},
{
"version_value": "1.7.0"
}
]
}
}
]
},
"vendor_name": "oracle"
},
{
"product": {
"product_data": [
{
"product_name": "polarssl",
"version": {
"version_data": [
{
"version_value": "0.10.0"
},
{
"version_value": "0.10.1"
},
{
"version_value": "0.11.0"
}
]
}
}
]
},
"vendor_name": "polarssl"
}
]
}
}
},
"lastModifiedDate": "2018-08-09T01:29Z",
"publishedDate": "2013-02-08T19:55Z"
}
60 changes: 57 additions & 3 deletions tests/etl_test/test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def test_csv():
# quote
assert t( ("data", CSV(r"city, pop, province") ))({'data': '"nj", "800", "js"'}) == {'province': 'js', 'city': 'nj', 'data': '"nj", "800", "js"', 'pop': '800'}
assert t( ("data", CSV(r"city, pop, province") ))({'data': '"nj", "800", "jiang, su"'}) == {'province': 'jiang, su', 'city': 'nj', 'data': '"nj", "800", "jiang, su"', 'pop': '800'}
assert t( ("data", CSV(r"city, pop, province") ))({'data': '"nj", "800", "jiang"" su"'}) == {'province': 'jiang" su', 'city': 'nj', 'data': '"nj", "800", "jiang"" su"', 'pop': '800'}
assert t( ("data", CSV(r"city, pop, province", quote='|') ))({'data': '|nj|, |800|, |jiang, su|'}) == {'province': 'jiang, su', 'city': 'nj', 'data': '|nj|, |800|, |jiang, su|', 'pop': '800'}

# restrict
Expand Down Expand Up @@ -537,13 +538,62 @@ def test_split():
d1 = {"i": "t1", "data": "1,2,3"}
assert t( ("data", SPLIT) )(d1) == [{"i": "t1", "data": "1"}, {"i": "t1", "data": "2"}, {"i": "t1", "data": "3"}]

# json list
d2 = {"i": "t1", "data": "[1,2,3]"}
assert t( ("data", SPLIT) )(d2) == [{"i": "t1", "data": "1"}, {"i": "t1", "data": "2"}, {"i": "t1", "data": "3"}]

# json list
d3 = {"i": "t1", "data": '["a", "b", "c"]'}
assert t( ("data", SPLIT) )(d3) == [{"i": "t1", "data": 'a'}, {"i": "t1", "data": "b"}, {"i": "t1", "data": "c"}]

#
# sep
assert t(("data", SPLIT(sep='#')))({"i": "t1", 'data': 'nj#800#js'}) == [{"i": "t1", 'data': 'nj'},{"i": "t1", 'data': '800'},{"i": "t1", 'data': 'js'}]

# lstrip
assert t( ("data", SPLIT ))({"i": "t1", 'data': 'nj, 800, js'}) == [{"i": "t1", 'data': 'nj'},{"i": "t1", 'data': '800'},{"i": "t1", 'data': 'js'}]
assert t( ("data", SPLIT(lstrip=False) ))({"i": "t1", 'data': 'nj, 800, js'}) == [{"i": "t1", 'data': 'nj'},{"i": "t1", 'data': ' 800'},{"i": "t1", 'data': ' js'}]

# quote
assert t( ("data", SPLIT ))({"i": "t1", 'data': '"nj", "800", "js"'}) == [{"i": "t1", 'data': 'nj'}, {"i": "t1",'data': '800'}, {"i": "t1",'data': 'js'} ]
assert t( ("data", SPLIT ))({"i": "t1", 'data': '"nj", "800", "jiang, su"'}) == [{"i": "t1", 'data': 'nj'}, {"i": "t1",'data': '800'}, {"i": "t1",'data': 'jiang, su'} ]
assert t( ("data", SPLIT ))({"i": "t1", 'data': '"nj", "800", "jiang"" su"'}) == [{"i": "t1", 'data': 'nj'}, {"i": "t1",'data': '800'}, {"i": "t1",'data': 'jiang" su'} ]
assert t( ("data", SPLIT(quote='|') ))({"i": "t1", 'data': '|nj|, |800|, |jiang, su|'}) == [{"i": "t1", 'data': 'nj'}, {"i": "t1",'data': '800'}, {"i": "t1",'data': 'jiang, su'} ]

# output fields
assert t( ("data", SPLIT(output="v")) )({"data": "[1,2,3]"}) == [{"v": "1", "data": "[1,2,3]"}, {"v": "2", "data": "[1,2,3]"}, {"v": "3", "data": "[1,2,3]"}]


def _get_file_content(path):

basedir = os.path.dirname(os.path.abspath(__file__))

with open(os.sep.join([basedir, path])) as f:
return f.read()


def test_split_filter():
d1 = {'i': '1', 'data': _get_file_content('json_data/CVE-2013-0169.json')}
# jmes filter output as list
jmes = 'cve.affects.vendor.vendor_data[*].product.product_data[*].product_name[]'
#print( t( ("data", SPLIT(jmes=jmes, output='data')) )(d1))
assert t( ("data", SPLIT(jmes=jmes, output='data')) )(d1) == [{"i": "1", 'data': "openssl"}, {"i": "1", 'data': "openjdk"}, {"i": "1", 'data': "polarssl"}]

d1 = {'i': '1', 'data': _get_file_content('json_data/CVE-2013-0169.json')}
# jmes filter output as string with dot
jmes = 'cve.affects.vendor.vendor_data[1].product.product_data[0].version.version_data[1].version_value'
# print(t( ("data", SPLIT(jmes=jmes, sep='.', output='data')) )(d1))
assert t( ("data", SPLIT(jmes=jmes, sep='.', output='data')) )(d1) == [{"i": "1", 'data': "1"}, {"i": "1", 'data': "6"}, {"i": "1", 'data': "0"}]

d1 = {'i': '1', 'data': _get_file_content('json_data/CVE-2013-0169.json')}
# jmes filter output a simple string
jmes = 'cve.CVE_data_meta.ID'
assert t(("data", SPLIT(jmes=jmes, output='data')))(d1) == {"i": "1", 'data': "CVE-2013-0169"}

d1 = {'i': '1', 'data': _get_file_content('json_data/CVE-2013-0169.json')}
jmes = 'cve.affects.vendor.vendor_data[*].product.product_data[]'
# print(t(("data", SPLIT(jmes=jmes, output='data')))(d1))
assert t(("data", SPLIT(jmes=jmes, output='data')))(d1) == [{'i': '1', 'data': '{"product_name": "openssl", "version": {"version_data": [{"version_value": "*"}, {"version_value": "0.9.8"}, {"version_value": "0.9.8a"}, {"version_value": "0.9.8b"}, {"version_value": "0.9.8c"}, {"version_value": "0.9.8d"}, {"version_value": "0.9.8f"}, {"version_value": "0.9.8g"}]}}'}, {'i': '1', 'data': '{"product_name": "openjdk", "version": {"version_data": [{"version_value": "-"}, {"version_value": "1.6.0"}, {"version_value": "1.7.0"}]}}'}, {'i': '1', 'data': '{"product_name": "polarssl", "version": {"version_data": [{"version_value": "0.10.0"}, {"version_value": "0.10.1"}, {"version_value": "0.11.0"}]}}'}]


def test_json():
# verify the KV extract pattern match
Expand All @@ -556,6 +606,9 @@ def test_json():
d3 = {"data": """[1,2,3]"""}
assert t( ("data", JSON) )(d3) == {"data": """[1,2,3]"""}




# d2 = {"data": """{"k1": 100, "k2": {"k21": 200} }"""}
# print(t( ("data", JSON) )(d2))
# assert t( ("data", JSON(level=2)) )(d2) == {"data": """{"k1": 100, "k2": {"k21": 200} }""", "k1": "100", "k2": '{"k21": 200}'}
Expand Down Expand Up @@ -632,9 +685,9 @@ def test_json():
# assert KV_F(r'data2')(d14) == {'k3': 'v3', 'k2': 'v2', 'data1': 'i=c14, k1=v1', 'data3': 'k4=v4', 'data2': 'k2=v2 k3=v3'}


# test_split()
# test_split_filter()
# exit(0)
#

test_condition()
test_regex()
test_csv()
Expand All @@ -643,6 +696,7 @@ def test_json():
test_lookup_mapping()
test_kv()
test_split()
test_split_filter()
test_json()
test_dispatch_transform()
test_meta()
Expand Down

0 comments on commit b441cb1

Please sign in to comment.