Skip to content

Commit

Permalink
pep8 style fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Dec 18, 2018
1 parent 982347f commit 5a0cdd7
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 84 deletions.
3 changes: 1 addition & 2 deletions aliyun/log/etl_core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

from .trans_comp import *
from .transform import *
from .runner import Runner
from .settings import *
from .settings import *
9 changes: 5 additions & 4 deletions aliyun/log/etl_core/config_parser.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import ast
import inspect
import logging
import re
import six
from collections import Callable
from .settings import TransFnType, check_fn_type_by_name, builtin_macros

import six
from functools import wraps
import inspect

from .etl_util import re_full_match
from .settings import TransFnType, check_fn_type_by_name, builtin_macros

logger = logging.getLogger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion aliyun/log/etl_core/etl_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def re_full_match(pattern, string, *args, **kwargs):
if m and m.span()[1] == len(string):
return m


@cached
def get_re_full_match(pattern, flags=0):
p = re.compile(pattern, flags=flags)
Expand All @@ -36,4 +37,4 @@ def ptn_full_match(string, *args, **kwargs):
if m and m.span()[1] == len(string):
return m

return ptn_full_match
return ptn_full_match
13 changes: 7 additions & 6 deletions aliyun/log/etl_core/runner.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from .config_parser import ConfigParser
import os
import sys
import copy
import inspect
import logging
import os
import sys

from .config_parser import ConfigParser
from .exceptions import SettingError
from collections import Iterable

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -44,11 +45,11 @@ def _process_event(self, event, fn_list):
if isinstance(new_event, (tuple, list)):
result = []
for e in new_event:
ret = self._process_event(e, fn_list[i+1:])
ret = self._process_event(e, fn_list[i + 1:])
if ret is None:
continue

if isinstance(ret, (tuple, list) ):
if isinstance(ret, (tuple, list)):
result.extend(ret)
else:
result.append(ret)
Expand Down
9 changes: 5 additions & 4 deletions aliyun/log/etl_core/trans_comp/trans_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class trans_comp_csv(trans_comp_base):
DEFAULT_QUOTE = '"'

def __init__(self, config, sep=None, quote=None, lstrip=None, restrict=None):
if isinstance(config, (six.text_type, six.binary_type) ):
if isinstance(config, (six.text_type, six.binary_type)):
self.keys = self.p_csv_sep.split(config)
elif isinstance(config, Iterable):
self.keys = list(config)
Expand All @@ -34,13 +34,15 @@ def __call__(self, event, inpt):
data = event[inpt]
ret = list(csv.reader([data], skipinitialspace=self.lstrip, delimiter=self.sep, quotechar=self.quote))[0]
if self.restrict and len(ret) != len(self.keys):
logger.warn("event {0} field {1} contains different count of fields as expected key {2} actual {3}".format(event, inpt, self.keys, ret))
logger.warning(
"event {0} field {1} contains different count of fields as expected key {2} actual {3}".format(
event, inpt, self.keys, ret))
return event

new_event = dict(zip(self.keys, ret))
event.update(new_event)
else:
logger.warn("field {0} doesn't exist in event {1}, skip it".format(inpt, event))
logger.warning("field {0} doesn't exist in event {1}, skip it".format(inpt, event))

return event

Expand All @@ -51,4 +53,3 @@ class trans_comp_tsv(trans_comp_csv):

class trans_comp_psv(trans_comp_csv):
DEFAULT_SEP = '|'

83 changes: 52 additions & 31 deletions aliyun/log/etl_core/trans_comp/trans_json.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from .trans_base import trans_comp_base
import six
import re
import logging
import inspect
import json
from ..etl_util import cached
import logging
from collections import Iterable

import jmespath
import re
import six
from jmespath.exceptions import ParseError
from ..exceptions import SettingError
import inspect

from .trans_base import trans_comp_base
from ..etl_util import get_re_full_match
import re
from ..exceptions import SettingError

__all__ = ['trans_comp_json']

Expand All @@ -37,10 +37,20 @@ class json_transformer(trans_comp_base):

DEFAULT_FMT_ARRAY = "{parent_rlist[0]}_{index}" # could also be custom formatting string using up to five placehodler: parent_list, parent_list, current, sep, prefix, suffix
FMT_MAP = {
"simple": lambda prefix, current, suffix, *args, **kwargs: "{prefix}{current}{suffix}".format(prefix=prefix, current=current, suffix=suffix),
"full": lambda parent_list, sep, prefix, current, suffix, *args, **kwargs: "{parent_list_str}{sep}{prefix}{current}{suffix}".format(parent_list_str=sep.join(parent_list), current=current, sep=sep, prefix=prefix, suffix=suffix),
"parent": lambda parent_list, sep, prefix, current, suffix, *args, **kwargs: "{parent}{sep}{prefix}{current}{suffix}".format(parent=parent_list[-1], current=current, sep=sep, prefix=prefix, suffix=suffix),
"root": lambda parent_list, sep, prefix, current, suffix, *args, **kwargs: "{parent_list[0]}{sep}{prefix}{current}{suffix}".format(parent_list=parent_list, current=current, sep=sep, prefix=prefix, suffix=suffix)
"simple": lambda prefix, current, suffix, *args, **kwargs: "{prefix}{current}{suffix}".format(prefix=prefix,
current=current,
suffix=suffix),
"full": lambda parent_list, sep, prefix, current, suffix, *args,
**kwargs: "{parent_list_str}{sep}{prefix}{current}{suffix}".format(
parent_list_str=sep.join(parent_list), current=current, sep=sep, prefix=prefix, suffix=suffix),
"parent": lambda parent_list, sep, prefix, current, suffix, *args,
**kwargs: "{parent}{sep}{prefix}{current}{suffix}".format(parent=parent_list[-1],
current=current, sep=sep,
prefix=prefix, suffix=suffix),
"root": lambda parent_list, sep, prefix, current, suffix, *args,
**kwargs: "{parent_list[0]}{sep}{prefix}{current}{suffix}".format(parent_list=parent_list,
current=current, sep=sep,
prefix=prefix, suffix=suffix)
# could also be custom formatting string using up to five placehodler: parent_list, parent_list, current, sep, prefix, suffix
# could also be formatting function accepting the 3 parameters: parrent_list, current key, current value
# Note: the functoin must result k, v tuple, if returning None, meaning skip this k-v
Expand Down Expand Up @@ -89,9 +99,9 @@ def __init__(self, jmes=None, jmes_ignore_none=None, output=None,
raise SettingError(ex=ex, msg="Invalid JMES filter setting", settings=jmes)
elif self.output:
logger.warning("json_transformer: parameter output '{0}' will be ignored as there's no filter is selected."
.format(output))
.format(output))

self.depth = min( (depth or self.DEFAULT_DEPTH), self.DEFAULT_DEPTH)
self.depth = min((depth or self.DEFAULT_DEPTH), self.DEFAULT_DEPTH)
self.include_node = include_node or self.DEFAULT_INCLUDE_NODE
self.exclude_node = exclude_node or self.DEFAULT_EXCLUDE_NODE
self.include_path = include_path or self.DEFAULT_INCLUDE_PATH
Expand All @@ -110,16 +120,19 @@ def __init__(self, jmes=None, jmes_ignore_none=None, output=None,
self.format_array = fmt_array or self.DEFAULT_FMT_ARRAY

def _skip_keys(self, key, parent_list):
if (self.include_node and not self.include_node_match(key)) or (self.exclude_node and self.exclude_node_match(key)):
if (self.include_node and not self.include_node_match(key)) or (
self.exclude_node and self.exclude_node_match(key)):
logger.info("json_transformer: 'key' {0} is not in include keys '{1}' or in exclude keys '{2}', skip it."
.format(key, self.include_node, self.exclude_node))
return True

if self.include_path or self.exclude_path:
path = '.'.join(parent_list) + '.' + key
if (self.include_path and not self.include_path_match(path)) or (self.exclude_path and self.exclude_path_match(path)):
logger.info("json_transformer: path '{0}' is not in include path '{1}' or in exclude path '{2}', skip it."
.format(path, self.include_path, self.exclude_path))
if (self.include_path and not self.include_path_match(path)) or (
self.exclude_path and self.exclude_path_match(path)):
logger.info(
"json_transformer: path '{0}' is not in include path '{1}' or in exclude path '{2}', skip it."
.format(path, self.include_path, self.exclude_path))
return True

return False
Expand All @@ -136,21 +149,23 @@ def format_add_kv(self, event, fmt, current, value, parent_list, parent_rlist, s
try:
if isinstance(fmt, (six.text_type, six.binary_type)):
ret = fmt.format(parent_list=parent_list, parent_rlist=parent_rlist, current=current, sep=sep,
prefix=prefix, suffix=suffix), \
prefix=prefix, suffix=suffix), \
json_transformer._n(value)
else:
# callable formatting function
ret = fmt(parent_list=parent_list, parent_rlist=parent_rlist, current=current, sep=sep, prefix=prefix, suffix=suffix), \
json_transformer._n(value)
ret = fmt(parent_list=parent_list, parent_rlist=parent_rlist, current=current, sep=sep,
prefix=prefix, suffix=suffix), \
json_transformer._n(value)
except Exception as ex:
logger.info("json_transformer: fail to format with settings: '{0}'".format( (fmt, current, value,
parent_list, sep, prefix, suffix) ))
logger.info("json_transformer: fail to format with settings: '{0}'".format((fmt, current, value,
parent_list, sep, prefix,
suffix)))
elif inspect.isfunction(fmt):
try:
ret = fmt(parent_list, current, value)
except Exception as ex:
logger.info("json_transformer: fail to call formatting string: {0} wuth parameters: {1}"
.format(fmt, (parent_list, current, value) ))
.format(fmt, (parent_list, current, value)))

if ret and len(ret) == 2:
k, v = ret
Expand All @@ -167,20 +182,24 @@ def _expand_json(self, event, key, value, parent_list, parent_rlist, depth, sep,
# 1. depth hit, 2. basic type, 3. array but not expand
logger.info("json_transformer: hit stop parsing, key: '{0}', value: '{1}', parent: '{2}', depth: '{3}'"
.format(key, value, parent_list, depth))
self.format_add_kv(event, self.fmt, self._n(key), self._n(value), parent_list, parent_rlist, sep, prefix, suffix)
self.format_add_kv(event, self.fmt, self._n(key), self._n(value), parent_list, parent_rlist, sep, prefix,
suffix)
return None

# convert array to dict
if isinstance(value, (list, tuple)):
value = dict((self.format_array.format(parent_list=parent_list, parent_rlist=parent_rlist, index=i), v) for i, v in enumerate(value))
value = dict(
(self.format_array.format(parent_list=parent_list, parent_rlist=parent_rlist, index=i), v) for i, v in
enumerate(value))

if isinstance(value, dict):
for k, v in six.iteritems(value):
if isinstance(v, (dict, tuple, list)):
# recursively parse it
self._expand_json(event, k, v, parent_list + (k, ), (k, ) + parent_rlist, depth, sep, prefix, suffix)
self._expand_json(event, k, v, parent_list + (k,), (k,) + parent_rlist, depth, sep, prefix, suffix)
else:
self.format_add_kv(event, self.fmt, self._n(k), self._n(v), parent_list, parent_rlist, sep, prefix, suffix)
self.format_add_kv(event, self.fmt, self._n(k), self._n(v), parent_list, parent_rlist, sep, prefix,
suffix)

else:
logger.info("json_transformer: skip unsupported message '{0}' of type '{1}' when expanding"
Expand Down Expand Up @@ -212,7 +231,7 @@ def _process_message(self, key, value):

# if need to expand
if self.expand:
self._expand_json(new_event, key, value, (key, ), (key, ), self.depth, self.sep, self.prefix, self.suffix)
self._expand_json(new_event, key, value, (key,), (key,), self.depth, self.sep, self.prefix, self.suffix)

return new_event

Expand All @@ -222,7 +241,8 @@ def extract_json(self, message):
try:
message = json.loads(message)
except Exception as ex:
logger.info("json_transformer: fail to load event into json object: {0}, error: {1}".format(message, ex))
logger.info(
"json_transformer: fail to load event into json object: {0}, error: {1}".format(message, ex))
return message

if isinstance(message, dict):
Expand Down Expand Up @@ -251,7 +271,8 @@ def __call__(self, event, inpt):
if new_event and isinstance(new_event, dict):
event.update(new_event)
else:
logger.info('trans_comp_lookup: event "{0}" does not extract value from field "{1}"'.format(event, i))
logger.info(
'trans_comp_lookup: event "{0}" does not extract value from field "{1}"'.format(event, i))
else:
logger.error("trans_comp_lookup: unknown type of input field {0}".format(inpt))

Expand Down
11 changes: 6 additions & 5 deletions aliyun/log/etl_core/trans_comp/trans_kv.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from .trans_base import trans_comp_base
import six
import re
import logging
import json
from ..etl_util import cached
from collections import Iterable

import re
import six

from .trans_base import trans_comp_base
from ..etl_util import cached

__all__ = ['trans_comp_kv']

logger = logging.getLogger(__name__)
Expand Down
27 changes: 15 additions & 12 deletions aliyun/log/etl_core/trans_comp/trans_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ def __init__(self, reader, case_insensitive):
self.case_insensitive = case_insensitive

def get_row(self, inputs):
assert isinstance(inputs, dict), LookupError(msg="trans_comp_lookup: inputs are not dict as expected", settings=inputs)
assert isinstance(inputs, dict), LookupError(msg="trans_comp_lookup: inputs are not dict as expected",
settings=inputs)

for k, v in six.iteritems(inputs):
if k not in self.field_names:
logger.info("trans_comp_lookup: key {0} doesn't exist in existing fields names: {1}".format(k, self.field_names))
logger.info("trans_comp_lookup: key {0} doesn't exist in existing fields names: {1}".format(k,
self.field_names))
return None

for row in self.rows:
Expand All @@ -45,7 +47,7 @@ def __init__(self, dct, case_insensitive):
self.default = dct.get('*', None)
self.case_insensitive = case_insensitive
if case_insensitive:
self.data = dict((k.lower(), v) for k, v in six.iteritems(dct))
self.data = dict((k.lower(), v) for k, v in six.iteritems(dct))
else:
self.data = dct

Expand All @@ -66,7 +68,7 @@ def __init__(self, data, output_fields, sep=',', quote='"', lstrip=True, case_in
self.data = DefaultDict(data, case_insensitive)

# init output fields
if isinstance(output_fields, (six.binary_type, six.text_type) ):
if isinstance(output_fields, (six.binary_type, six.text_type)):
self.output_fields = [output_fields]
elif isinstance(output_fields, Iterable):
self.output_fields = []
Expand All @@ -78,7 +80,7 @@ def __init__(self, data, output_fields, sep=',', quote='"', lstrip=True, case_in
else:
raise SettingError(settings=output_fields)

elif isinstance(data, (six.binary_type, six.text_type) ):
elif isinstance(data, (six.binary_type, six.text_type)):

self.sig = (data, sep, quote, lstrip)

Expand Down Expand Up @@ -106,13 +108,13 @@ def __init__(self, data, output_fields, sep=',', quote='"', lstrip=True, case_in

# init output fields
self.output_fields = {}
if isinstance(output_fields, (six.binary_type, six.text_type) ):
if isinstance(output_fields, (six.binary_type, six.text_type)):
self.output_fields[output_fields] = output_fields
elif isinstance(output_fields, Iterable):
for f in output_fields:
if isinstance(f, (six.binary_type, six.text_type)):
self.output_fields[f] = f
elif isinstance(f, (tuple, list) ) and len(f) == 2:
elif isinstance(f, (tuple, list)) and len(f) == 2:
self.output_fields[f[0]] = f[1]
else:
raise SettingError(settings=output_fields)
Expand All @@ -125,7 +127,6 @@ def __init__(self, data, output_fields, sep=',', quote='"', lstrip=True, case_in
else:
raise SettingError(settings=data)


def __call__(self, event, inpt):
if isinstance(self.data, DefaultDict):
# simple dict mode
Expand Down Expand Up @@ -167,9 +168,9 @@ def __call__(self, event, inpt):
return event

if isinstance(i, (tuple, list)) and len(i) != 2:
logger.error('trans_comp_lookup: type of input field "{0}" is unsupported'.format(i))
# must exit, or else skip it for lookup type
return event
logger.error('trans_comp_lookup: type of input field "{0}" is unsupported'.format(i))
# must exit, or else skip it for lookup type
return event

if isinstance(i, (six.binary_type, six.text_type)):
i = (i, i)
Expand All @@ -183,7 +184,9 @@ def __call__(self, event, inpt):

row = self.data.get_row(inpt_map)
if row is None:
logger.info('trans_comp_lookup: cannot find proper value for inpt "{0}" in event "{0}" doesn not contain field "{1}"'.format(inpt_map, event))
logger.info(
'trans_comp_lookup: cannot find proper value for inpt "{0}" in event "{0}" doesn not contain field "{1}"'.format(
inpt_map, event))
return event

for f, f_new in six.iteritems(self.output_fields):
Expand Down

0 comments on commit 5a0cdd7

Please sign in to comment.