Skip to content

Commit

Permalink
add transform CSV/TSV support
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Dec 3, 2018
1 parent 5641d0e commit a6cbcdf
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 17 deletions.
1 change: 1 addition & 0 deletions aliyun/log/etl_core/trans_comp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .trans_base import V
from .trans_regex import *
from .trans_lookup import *
from .trans_csv import *

__all__ = ['REGEX', 'CSV', 'TSV', 'JSON', 'KV', 'V']

Expand Down
50 changes: 50 additions & 0 deletions aliyun/log/etl_core/trans_comp/trans_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from .trans_base import trans_comp_base
import csv
import six
import re
from collections import Iterable
from ..exceptions import SettingError
import logging

__all__ = ['trans_comp_csv', 'trans_comp_tsv']

logger = logging.getLogger(__name__)


class trans_comp_csv(trans_comp_base):
p_csv_sep = re.compile(r'\s*,\s*')
DEFAULT_SEP = ','
DEFAULT_QUOTE = '"'

def __init__(self, config, sep=None, quote=None, lstrip=None, restrict=None):
if isinstance(config, (six.text_type, six.binary_type) ):
self.keys = self.p_csv_sep.split(config)
elif isinstance(config, Iterable):
self.keys = list(config)
else:
raise SettingError(settings=config)

self.sep = sep or self.DEFAULT_SEP
self.lstrip = True if lstrip is None else lstrip
self.quote = quote or self.DEFAULT_QUOTE
self.restrict = False if restrict is None else restrict

def __call__(self, event, inpt):
if inpt in event:
data = event[inpt]
ret = list(csv.reader([data], skipinitialspace=self.lstrip, delimiter=self.sep, quotechar=self.quote))[0]
if self.restrict and len(ret) != len(self.keys):
logger.warn("event {0} field {1} contains different count of fields as expected key {2} actual {3}".format(event, inpt, self.keys, ret))
return event

new_event = dict(zip(self.keys, ret))
event.update(new_event)
else:
logger.warn("field {0} doesn't exist in event {1}, skip it".format(inpt, event))

return event


class trans_comp_tsv(trans_comp_csv):
DEFAULT_SEP = '\t'

18 changes: 1 addition & 17 deletions aliyun/log/etl_core/trans_comp/trans_lookup.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,6 @@
from .trans_base import trans_comp_base

__all__ = ['trans_comp_csv', 'trans_comp_tsv', 'trans_comp_lookup', 'trans_comp_json', 'trans_comp_kv']


class trans_comp_csv(trans_comp_base):
def __init__(self, config):
self.config = config

def __call__(self, event, inpt):
return event


class trans_comp_tsv(trans_comp_base):
def __init__(self, config):
self.config = config

def __call__(self, event, inpt):
return event
__all__ = ['trans_comp_lookup', 'trans_comp_json', 'trans_comp_kv']


class trans_comp_lookup(trans_comp_base):
Expand Down
29 changes: 29 additions & 0 deletions tests/etl_test/test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def test_regex():
# regex 3-tuple dict
assert transform(("k1", r"(\w+):(\d+)", {r"k_\1": r"v_\2"}))({'k1': 'abc:123 xyz:456'}) == {'k1': 'abc:123 xyz:456', 'k_abc': "v_123", "k_xyz": "v_456"}


def test_dispatch_transform():
DISPATCH_LIST_data = [
({"data": "^LTE_Information "}, {"__topic__": "etl_info"}),
Expand Down Expand Up @@ -266,11 +267,39 @@ def test_module():
verify_case(data3_test1, './data3.txt', './data3_test1_result.txt')


def test_csv():
# sep
assert transform( ("data", CSV(r"city,pop,province") ))({'data': 'nj,800,js'}) == {'province': 'js', 'city': 'nj', 'data': 'nj,800,js', 'pop': '800'}
assert transform(("data", CSV(r"city, pop, province", sep='#')))({'data': 'nj#800#js'}) == {'province': 'js', 'city': 'nj', 'data': 'nj#800#js', 'pop': '800'}

# config
assert transform( ("data", CSV(['city', 'pop', 'province']) ))({'data': 'nj, 800, js'}) == {'province': 'js', 'city': 'nj', 'data': 'nj, 800, js', 'pop': '800'}

# lstrip
assert transform( ("data", CSV(r"city, pop, province") ))({'data': 'nj, 800, js'}) == {'province': 'js', 'city': 'nj', 'data': 'nj, 800, js', 'pop': '800'}
assert transform( ("data", CSV(r"city, pop, province", lstrip=False) ))({'data': 'nj, 800, js'}) == {'province': ' js', 'city': 'nj', 'data': 'nj, 800, js', 'pop': ' 800'}

# quote
assert transform( ("data", CSV(r"city, pop, province") ))({'data': '"nj", "800", "js"'}) == {'province': 'js', 'city': 'nj', 'data': '"nj", "800", "js"', 'pop': '800'}
assert transform( ("data", CSV(r"city, pop, province") ))({'data': '"nj", "800", "jiang, su"'}) == {'province': 'jiang, su', 'city': 'nj', 'data': '"nj", "800", "jiang, su"', 'pop': '800'}
assert transform( ("data", CSV(r"city, pop, province", quote='|') ))({'data': '|nj|, |800|, |jiang, su|'}) == {'province': 'jiang, su', 'city': 'nj', 'data': '|nj|, |800|, |jiang, su|', 'pop': '800'}

# restrict
assert transform(("data", CSV(r"city, pop, province")))({'data': 'nj,800,js,gudu'}) == {'province': 'js', 'city': 'nj', 'data': 'nj,800,js,gudu', 'pop': '800'}
assert transform(("data", CSV(r"city, pop, province", restrict=True)))({'data': 'nj,800,js,gudu'}) == {'data': 'nj,800,js,gudu'}
assert transform(("data", CSV(r"city, pop, province", restrict=True)))({'data': 'nj,800'}) == {'data': 'nj,800'}

# TSV
assert transform( ("data", TSV(r"city,pop,province") ))({'data': 'nj\t800\tjs'}) == {'province': 'js', 'city': 'nj', 'data': 'nj\t800\tjs', 'pop': '800'}


test_condition()
test_regex()
test_dispatch_transform()
test_meta()
test_parse()
test_runner()
test_module()
test_csv()


0 comments on commit a6cbcdf

Please sign in to comment.