Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Nov 19, 2018
1 parent 9729b52 commit 2bf2aa7
Show file tree
Hide file tree
Showing 43 changed files with 2,390 additions and 10 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ install:

script:
- python tests/ut/test_util.py
- python tests/etl_test/test_case.py
- python tests/sample.py
- python tests/integration_test/test_logtail_config.py
- python tests/integration_test/test_entity.py
Expand Down
5 changes: 5 additions & 0 deletions aliyun/log/etl_core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

from .trans_comp import *
from .transform import *
from .runner import Runner

74 changes: 74 additions & 0 deletions aliyun/log/etl_core/config_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import ast
import inspect
import logging
import re
import six
from collections import Callable
from .settings import TransFnType, check_fn_type_by_name, builtin_macros
from functools import wraps
import inspect
import os

logger = logging.getLogger(__name__)


class ConfigParser(ast.NodeVisitor):
@staticmethod
def get_fn(fn, name):
fn_type = check_fn_type_by_name(name)
if fn_type == TransFnType.EVENT_NEW:
return fn
if fn_type == TransFnType.EVENT_UPDATE:
@wraps(fn)
def new_event_fn(event):
r = fn(event)
event.update(r)
return event

return new_event_fn

return None

@staticmethod
def macro_check(name):
for k, v in six.iteritems(builtin_macros):
if re.match(k, name):
return v

def __init__(self, md):
self.name_list = []
self.fn_list = []
self.md = md

def visit_Name(self, node):
if hasattr(self.md, node.id):
obj = getattr(self.md, node.id)
if isinstance(obj, Callable):
fn = self.get_fn(obj, node.id)
if fn:
logging.info("get name {0} which is a fn actually, add it".format(node.id))
self.name_list.append([node.lineno, fn])
else:
fn_map = self.macro_check(node.id)
if fn_map is not None:
v = getattr(self.md, node.id)
self.name_list.append([node.lineno, fn_map(v)])
else:
logging.info("get name {0} not in macro list, skip it".format(node.id))

def generic_visit(self, node):
if isinstance(node, ast.FunctionDef):
obj = getattr(self.md, node.name, None)
if inspect.isfunction(obj):
fn = self.get_fn(obj, node.name)
if fn:
self.fn_list.append([node.lineno, fn])

ast.NodeVisitor.generic_visit(self, node)

def parse(self):
code = inspect.getsource(self.md)

self.visit(ast.parse(code))
return sorted(self.name_list + self.fn_list, key=lambda x: x[0])

8 changes: 8 additions & 0 deletions aliyun/log/etl_core/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from ..logexception import LogException


class SettingError(LogException):
def __init__(self, ex=None, settings=""):
super(SettingError, self).__init__('InvalidConfig', 'Invalid Settings "{0}"\nDetail: {1}'.format(settings, ex))
self.settings = settings

40 changes: 40 additions & 0 deletions aliyun/log/etl_core/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from .config_parser import ConfigParser
import os
import sys
import copy
import inspect
import logging
logger = logging.getLogger(__name__)
from .exceptions import SettingError


class Runner(object):
def __init__(self, config_path):
if not inspect.ismodule(config_path):
basedir = os.path.dirname(os.path.abspath(config_path))
module_name = os.path.basename(config_path[:-3])
if basedir not in sys.path:
sys.path.insert(0, basedir)
try:
md = __import__(module_name)
except ImportError as ex:
logger.error("Cannot import config path: {0}".format(config_path))
raise SettingError(ex, 'Cannot import the config "{0}"'.format(config_path))
else:
md = config_path

logger.info("runner: passed module {0} from config file {1}".format(md, config_path))

parsed_fn = ConfigParser(md).parse()
logger.info("runner: passed fn list: {0}".format(parsed_fn))

self.fn_list = [fn for no, fn in parsed_fn]

def __call__(self, event):
ret = copy.copy(event)
for fn in self.fn_list:
ret = fn(ret)
if ret is None:
return None

return ret
44 changes: 44 additions & 0 deletions aliyun/log/etl_core/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

from .transform import *
# from .transform.transform_base import transform_base
# from .trans_comp.trans_base import trans_comp_base
# from collections import Callable
from enum import Enum
# import re

builtin_macros = {
'KEEP_EVENT_.*': keep_event,
'DROP_EVENT_.*': drop_event,
'KEEP_FIELDS_.*': keep_fields,
'DROP_FIELDS_.*': drop_fields,
'RENAME_FIELDS_.*': rename_fields,
'ALIAS_.*': rename_fields,
'DISPATCH_EVENT_.*': dispatch_event,
'TRANSFORM_EVENT_.*': transform_event
}





class TransFnType(Enum):
EVENT_NEW = "EVENT_NEW",
EVENT_UPDATE = 'EVENT_UPDATE',
# TRANS_COMP = 'TRANS_COMP'
UNKNOWN = 'UNKNOWN'


def check_fn_type_by_name(name):
# if isinstance(fn, transform_base):
# return TransFnType.EVENT_NEW
# elif isinstance(fn, trans_comp_base):
# return TransFnType.TRANSFORM_COMP
# if not isinstance(fn, Callable):
# fn_name = getattr(fn, '__name__', getattr(fn, "func_name", ""))
if name.startswith("sls_en_"):
return TransFnType.EVENT_NEW
elif name.startswith("sls_eu_"):
return TransFnType.EVENT_UPDATE

return TransFnType.UNKNOWN

12 changes: 12 additions & 0 deletions aliyun/log/etl_core/trans_comp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from .trans_base import V
from .trans_regex import *
from .trans_lookup import *

__all__ = ['REGEX', 'CSV', 'TSV', 'JSON', 'KV', 'V']

# field based
REGEX = trans_comp_regex
CSV = trans_comp_csv
TSV = trans_comp_tsv
JSON = trans_comp_json
KV = trans_comp_kv
21 changes: 21 additions & 0 deletions aliyun/log/etl_core/trans_comp/trans_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
class trans_comp_base(object):
pass


class V(trans_comp_base):
def __init__(self, config):
self.field = config

def __call__(self, event, inpt=None):
if inpt is None:
# it's a field setting value mode (just return value)
return event.get(self.field, None)
else:
# it's transform mote (do configuration)
if self.field not in event:
if inpt in event:
del event[inpt]
else:
event[inpt] = event[self.field]

return event
43 changes: 43 additions & 0 deletions aliyun/log/etl_core/trans_comp/trans_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from .trans_base import trans_comp_base

__all__ = ['trans_comp_csv', 'trans_comp_tsv', 'trans_comp_lookup', 'trans_comp_json', 'trans_comp_kv']


class trans_comp_csv(trans_comp_base):
def __init__(self, config):
self.config = config

def __call__(self, event, inpt):
return event


class trans_comp_tsv(trans_comp_base):
def __init__(self, config):
self.config = config

def __call__(self, event, inpt):
return event


class trans_comp_lookup(trans_comp_base):
def __init__(self, config):
self.config = config

def __call__(self, event, inpt):
return event


class trans_comp_json(trans_comp_base):
def __init__(self, config):
self.config = config

def __call__(self, event, inpt):
return event


class trans_comp_kv(trans_comp_base):
def __init__(self, config):
self.config = config

def __call__(self, event, inpt):
return event

0 comments on commit 2bf2aa7

Please sign in to comment.