Skip to content

Commit

Permalink
add splitting feature to ETL and built-in SPLIT (via list) w/ test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Dec 14, 2018
1 parent aceae06 commit 5bc536c
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 33 deletions.
37 changes: 31 additions & 6 deletions aliyun/log/etl_core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import inspect
import logging
from .exceptions import SettingError
from collections import Iterable
logger = logging.getLogger(__name__)


Expand All @@ -30,11 +31,35 @@ def __init__(self, config_path):

self.fn_list = [fn for no, fn in parsed_fn]

def __call__(self, event):
ret = copy.copy(event)
for fn in self.fn_list:
ret = fn(ret)
if ret is None:
def _process_event(self, event, fn_list):
if not len(fn_list):
return event

new_event = copy.copy(event)
for i, fn in enumerate(fn_list):
new_event = fn(new_event)
if new_event is None:
return None

return ret
if isinstance(new_event, (tuple, list)):
result = []
for e in new_event:
ret = self._process_event(e, fn_list[i+1:])
if ret is None:
continue

if isinstance(ret, (tuple, list) ):
result.extend(ret)
else:
result.append(ret)

if result:
if len(result) == 1:
return result[0]
return result
return None # return None for empty list

return new_event

def __call__(self, event):
return self._process_event(event, self.fn_list)
5 changes: 4 additions & 1 deletion aliyun/log/etl_core/trans_comp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from .trans_lookup import *
from .trans_csv import *
from .trans_kv import *
from .trans_json import *
from .trans_mv import *

__all__ = ['LOOKUP', 'REGEX', 'CSV', 'TSV', 'PSV', 'JSON', 'KV', 'V']
__all__ = ['LOOKUP', 'REGEX', 'CSV', 'TSV', 'PSV', 'JSON', 'KV', 'V', 'SPLIT']

# field based
LOOKUP = trans_comp_lookup
Expand All @@ -14,3 +16,4 @@
PSV = trans_comp_psv
JSON = trans_comp_json
KV = trans_comp_kv
SPLIT = trans_comp_split_event
70 changes: 70 additions & 0 deletions aliyun/log/etl_core/trans_comp/trans_mv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from .trans_base import trans_comp_base
import csv
import six
import logging
import copy
import json

__all__ = ['trans_comp_split_event']

logger = logging.getLogger(__name__)


def trans_comp_split_event(*args, **kwargs):
if (args and isinstance(args[0], dict)) or ('event' in kwargs and isinstance(kwargs['event'], dict)):
# event case
split = split_event_transformer()
return split(*args, **kwargs)
else:
return split_event_transformer(*args, **kwargs)


class split_event_transformer(trans_comp_base):
DEFAULT_SEP = ','
DEFAULT_QUOTE = '"'

def __init__(self, sep=None, quote=None, lstrip=None, output_field=None):
self.sep = sep or self.DEFAULT_SEP
self.lstrip = True if lstrip is None else lstrip
self.quote = quote or self.DEFAULT_QUOTE
self.output_field = output_field

def _process_list(self, event, field, lst):
if not lst:
return event

if len(lst) > 1:
result = []
for d in lst:
e = copy.copy(event)
e[field] = self._n(d)
result.append(e)
return result
else:
event[field] = self._n(lst)
return event

def _parse_list(self, v):
try:
ret = json.loads(v)
if isinstance(ret, (list)):
return ret
except Exception as ex:
logger.debug("split_event_transformer: value {0} is not json, try to use csv".format(v))

result = list(csv.reader([v], skipinitialspace=self.lstrip, delimiter=self.sep, quotechar=self.quote))[0]
return result

def __call__(self, event, inpt):
# overwrite input field
if not self.output_field:
self.output_field = inpt

# csv mode
if isinstance(inpt, (six.binary_type, six.text_type)):
ret = self._parse_list(event[inpt])
return self._process_list(event, inpt, ret)
else:
logger.error("trans_comp_lookup: unknown type of input field {0}".format(inpt))

return event
46 changes: 28 additions & 18 deletions aliyun/log/logclient_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ def put_logs_auto_div(client, req, div=1):


def _transform_events_to_logstore(runner, events, to_client, to_project, to_logstore):
count = removed = 0
count = removed = processed = failed = 0
new_events = defaultdict(list)

default_time = time.time()
Expand All @@ -651,17 +651,25 @@ def _transform_events_to_logstore(runner, events, to_client, to_project, to_logs
removed += 1
continue

dt = int(new_event.get('__time__', default_time)) // 60 # group logs in same minute
topic = ''
source = ''
if "__topic__" in new_event:
topic = new_event['__topic__']
del new_event["__topic__"]
if "__source__" in new_event:
source = new_event['__source__']
del new_event["__source__"]
if not isinstance(new_event, (tuple, list)):
new_event = (new_event, )

new_events[(dt, topic, source)].append(new_event)
for event in new_event:
if not isinstance(event, dict):
logger.error("transform_data: get unknown type of processed event: {0}".format(event))
continue

dt = int(event.get('__time__', default_time)) // 60 # group logs in same minute
topic = ''
source = ''
if "__topic__" in event:
topic = event['__topic__']
del event["__topic__"]
if "__source__" in event:
source = event['__source__']
del event["__source__"]

new_events[(dt, topic, source)].append(new_event)

for (dt, topic, source), contents in six.iteritems(new_events):

Expand All @@ -678,8 +686,9 @@ def _transform_events_to_logstore(runner, events, to_client, to_project, to_logs

req = PutLogsRequest(project=to_project, logstore=to_logstore, topic=topic, source=source, logitems=items)
res = put_logs_auto_div(to_client, req)
processed += len(items)

return count, removed
return count, removed, processed, failed


def transform_worker(from_client, from_project, from_logstore, shard_id, from_time, to_time,
Expand All @@ -691,16 +700,17 @@ def transform_worker(from_client, from_project, from_logstore, shard_id, from_ti
iter_data = from_client.pull_log(from_project, from_logstore, shard_id, from_time, to_time, batch_size=batch_size,
compress=compress)

count = 0
removed = 0
count = removed = processed = failed = 0
for s in iter_data:
events = s.get_flatten_logs_json(time_as_str=True)

c, r = _transform_events_to_logstore(runner, events, to_client, to_project, to_logstore)
c, r, p, f = _transform_events_to_logstore(runner, events, to_client, to_project, to_logstore)
count += c
removed += r
processed += p
failed += f

return shard_id, count, removed
return shard_id, count, removed, processed, failed
except Exception as ex:
logger.error(ex)
raise
Expand Down Expand Up @@ -808,12 +818,12 @@ def transform_data(from_client, from_project, from_logstore, from_time,
for shard in target_shards]

for future in as_completed(futures):
partition, count, removed = future.result()
partition, count, removed, processed, failed = future.result()
total_count += count
total_removed += removed
if count:
result[partition] = {"total_count": count, "transformed":
count-removed, "removed": removed}
processed, "removed": removed, "failed": failed}

return LogResponse({}, {"total_count": total_count, "shards": result})

Expand Down
4 changes: 3 additions & 1 deletion tests/etl_test/data4.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@
{"data": "123", "f1":"a", "f2":"b"}
{"data": "123", "f1":"A", "f2":"C"}
{"data": "123", "f1":"x", "f2":"y"}
{"kv_data1": "k1=v1 k2=\"v2, v2\"", "kv_data2": "k4=v4 k5=v5"}
{"kv_data1": "k1=v1 k2=\"v2, v2\"", "kv_data2": "k4=v4 k5=v5"}
{"split": "a,b,c"}
{"split2": "a,b,c"}
10 changes: 8 additions & 2 deletions tests/etl_test/data4_test1.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
( NO_EMPTY('csv_field'), ('csv_field', CSV("f_v1,f_v2,f_v3")) ),
( NO_EMPTY('dsv_field'), ('dsv_field', CSV("f_d1,f_d2,f_d3", sep='#', quote='|')) ),
( ANY, ([("f1", "c1"), ("f2", "c2")], LOOKUP('./data4_lookup_csv1.txt', ["d1", "d2"]) ) ),
( ANY, ("f1",LOOKUP({'a': "a_new", '*': "unknown"}, "f1_new")) )
( ANY, ("f1",LOOKUP({'a': "a_new", '*': "unknown"}, "f1_new")) ),
( NO_EMPTY('split2'), ("split2", SPLIT))
]

KV_FIELDS_data = r'kv_data\d+'

DROP_FIELDS_origin = ["csv_field", 'dsv_field', 'data', 'f1', 'f2', r'kv_data\d+']
DROP_FIELDS_origin = ["csv_field", 'dsv_field', 'data', 'f1', 'f2', r'kv_data\d+']


@condition(NO_EMPTY("split"))
def sls_en_split(event):
return [{"split": v} for v in event['split'].split(',')]
8 changes: 7 additions & 1 deletion tests/etl_test/data4_test1_result.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,10 @@
{"d2": "2", "d1": "1", "f1_new": "a_new"}
{"d2": "6", "d1": "5", "f1_new": "a_new"}
{"d2": "8", "d1": "7", "f1_new": "unknown"}
{"k2": "v2, v2", "k1": "v1", "k5": "v5", "k4": "v4"}
{"k2": "v2, v2", "k1": "v1", "k5": "v5", "k4": "v4"}
{"split": "a"}
{"split": "b"}
{"split": "c"}
{"split2": "a"}
{"split2": "b"}
{"split2": "c"}

0 comments on commit 5bc536c

Please sign in to comment.