Skip to content

Commit

Permalink
support consumer group for CLI data transform: aliyun/aliyun-log-cli#47
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Nov 20, 2018
1 parent eb0eff9 commit 68188f7
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 103 deletions.
3 changes: 2 additions & 1 deletion aliyun/log/consumer/consumer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
from .exceptions import CheckPointException
from .exceptions import ClientWorkerException
from ..logexception import LogException
from .. import LogClient
from ..version import USER_AGENT


class ConsumerClient(object):
def __init__(self, endpoint, access_key_id, access_key, project,
logstore, consumer_group, consumer, security_token=None):

from .. import LogClient

self.mclient = LogClient(endpoint, access_key_id, access_key, security_token)
self.mclient.set_user_agent('%s-consumergroup-%s' % (USER_AGENT, consumer_group))
self.mproject = project
Expand Down
2 changes: 1 addition & 1 deletion aliyun/log/etl_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
from .trans_comp import *
from .transform import *
from .runner import Runner

from .settings import *
17 changes: 10 additions & 7 deletions aliyun/log/etl_core/settings.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@

from .transform import *
# from .transform.transform_base import transform_base
# from .trans_comp.trans_base import trans_comp_base
# from collections import Callable
from enum import Enum
# import re

builtin_macros = {
'KEEP_EVENT_.*': keep_event,
Expand All @@ -17,8 +12,17 @@
'TRANSFORM_EVENT_.*': transform_event
}

__all__ = ['KEEP_EVENT_',
'DROP_EVENT_',
'KEEP_FIELDS_',
'DROP_FIELDS_',
'RENAME_FIELDS_',
'ALIAS_',
'DISPATCH_EVENT_',
'TRANSFORM_EVENT_']


for key in builtin_macros.keys():
globals()[key[:-2]] = "Use this prefix to auto call the function: {}".format(builtin_macros[key])


class TransFnType(Enum):
Expand All @@ -41,4 +45,3 @@ def check_fn_type_by_name(name):
return TransFnType.EVENT_UPDATE

return TransFnType.UNKNOWN

55 changes: 41 additions & 14 deletions aliyun/log/logclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -2591,7 +2591,7 @@ def es_migration(self, hosts,
resp.body = res
return resp

def copy_data(self, project, logstore, from_time, to_time,
def copy_data(self, project, logstore, from_time, to_time=None,
to_client=None, to_project=None, to_logstore=None,
batch_size=500, compress=True, new_topic=None, new_source=None):
"""
Expand All @@ -2607,7 +2607,7 @@ def copy_data(self, project, logstore, from_time, to_time,
:param from_time: curosr value, could be begin, timestamp or readable time in readable time like "%Y-%m-%d %H:%M:%S CST" e.g. "2018-01-02 12:12:10 CST", also support human readable string, e.g. "1 hour ago", "now", "yesterday 0:0:0", refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
:type to_time: string/int
:param to_time: curosr value, could be begin, timestamp or readable time in readable time like "%Y-%m-%d %H:%M:%S CST" e.g. "2018-01-02 12:12:10 CST", also support human readable string, e.g. "1 hour ago", "now", "yesterday 0:0:0", refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
:param to_time: curosr value, default is "end", could be begin, timestamp or readable time in readable time like "%Y-%m-%d %H:%M:%S CST" e.g. "2018-01-02 12:12:10 CST", also support human readable string, e.g. "1 hour ago", "now", "yesterday 0:0:0", refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
:type to_client: LogClient
:param to_client: logclient instance, if empty will use source client
Expand All @@ -2633,15 +2633,20 @@ def copy_data(self, project, logstore, from_time, to_time,
:return: LogResponse {"total_count": 30, "shards": {0: 10, 1: 20} })
"""
return copy_data(self, project, logstore, from_time, to_time,
return copy_data(self, project, logstore, from_time, to_time=to_time,
to_client=to_client, to_project=to_project, to_logstore=to_logstore,
batch_size=batch_size, compress=compress, new_topic=new_topic, new_source=new_source)

def transform_data(self, project, logstore, from_time, to_time, config=None,
def transform_data(self, project, logstore, from_time, to_time=None, config=None,
to_client=None, to_project=None, to_logstore=None,
batch_size=500, compress=True):
shard_list=None,
batch_size=500, compress=True,
cg_name=None, c_name=None,
cg_heartbeat_interval=None, cg_data_fetch_interval=None, cg_in_order=None,
cg_worker_pool_size=None
):
"""
transform data from one logstore to another one (could be the same or in different region), the time passed is log received time on server side.
transform data from one logstore to another one (could be the same or in different region), the time passed is log received time on server side. There're two mode, batch mode / consumer group mode. For Batch mode, just leave the cg_name and later options as None.
:type project: string
:param project: project name
Expand All @@ -2653,7 +2658,7 @@ def transform_data(self, project, logstore, from_time, to_time, config=None,
:param from_time: curosr value, could be begin, timestamp or readable time in readable time like "%Y-%m-%d %H:%M:%S CST" e.g. "2018-01-02 12:12:10 CST", also support human readable string, e.g. "1 hour ago", "now", "yesterday 0:0:0", refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
:type to_time: string/int
:param to_time: curosr value, could be begin, timestamp or readable time in readable time like "%Y-%m-%d %H:%M:%S CST" e.g. "2018-01-02 12:12:10 CST", also support human readable string, e.g. "1 hour ago", "now", "yesterday 0:0:0", refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
:param to_time: curosr value, leave it as None if consumer group is configured. could be begin, timestamp or readable time in readable time like "%Y-%m-%d %H:%M:%S CST" e.g. "2018-01-02 12:12:10 CST", also support human readable string, e.g. "1 hour ago", "now", "yesterday 0:0:0", refer to https://aliyun-log-cli.readthedocs.io/en/latest/tutorials/tutorial_human_readable_datetime.html
:type config: string
:param config: transform config imported or path of config (in python)
Expand All @@ -2667,25 +2672,47 @@ def transform_data(self, project, logstore, from_time, to_time, config=None,
:type to_logstore: string
:param to_logstore: logstore name, if empty will use source logstore
:type shard_list: string
:param shard_list: shard number list. could be comma seperated list or range: 1,20,31-40
:type batch_size: int
:param batch_size: batch size to fetch the data in each iteration. by default it's 500
:type compress: bool
:param compress: if use compression, by default it's True
:type new_topic: string
:param new_topic: overwrite the copied topic with the passed one
:type cg_name: string
:param cg_name: consumer group name. must configure if it's consumer group mode.
:type new_source: string
:param new_source: overwrite the copied source with the passed one
:type c_name: string
:param c_name: consumer group name for consumer group mode, default: CLI-transform-data-${process_id}
:return: LogResponse {"total_count": 30, "shards": {0: 10, 1: 20} })
:type cg_heartbeat_interval: int
:param cg_heartbeat_interval: cg_heartbeat_interval, default 20
:type cg_data_fetch_interval: int
:param cg_data_fetch_interval: cg_data_fetch_interval, default 2
:type cg_in_order: bool
:param cg_in_order: cg_in_order, default False
:type cg_worker_pool_size: int
:param cg_worker_pool_size: cg_worker_pool_size, default 2
:return: LogResponse {"total_count": 30, "shards": {0: {"count": 10, "removed": 1}, 2: {"count": 20, "removed": 1}} })
"""
return transform_data(self, project, logstore, from_time, to_time,
return transform_data(self, project, logstore, from_time, to_time=to_time,
config=config,
to_client=to_client, to_project=to_project, to_logstore=to_logstore,
batch_size=batch_size, compress=compress)
shard_list=shard_list,
batch_size=batch_size, compress=compress,
cg_name=cg_name, c_name=c_name,
cg_heartbeat_interval=cg_heartbeat_interval,
cg_data_fetch_interval=cg_data_fetch_interval,
cg_in_order=cg_in_order,
cg_worker_pool_size=cg_worker_pool_size
)

def get_resource_usage(self, project):
""" get resource usage ist the project
Expand Down

0 comments on commit 68188f7

Please sign in to comment.