-
Notifications
You must be signed in to change notification settings - Fork 116
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add an example of sending log to syslog using consumer group
- Loading branch information
Showing
1 changed file
with
149 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
import os | ||
import logging | ||
from logging.handlers import RotatingFileHandler | ||
from aliyun.log.consumer import * | ||
from aliyun.log.pulllog_response import PullLogResponse | ||
from multiprocessing import current_process | ||
import syslogclient | ||
from syslogclient import SyslogClientRFC5424 as SyslogClient | ||
import six | ||
from datetime import datetime | ||
|
||
# configure logging file | ||
root = logging.getLogger() | ||
handler = RotatingFileHandler("{0}_{1}.log".format(os.path.basename(__file__), current_process().pid), maxBytes=100*1024*1024, backupCount=5) | ||
handler.setFormatter(logging.Formatter(fmt='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) | ||
root.setLevel(logging.INFO) | ||
root.addHandler(handler) | ||
root.addHandler(logging.StreamHandler()) | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class SyncData(ConsumerProcessorBase): | ||
""" | ||
this consumer will forward logs to Splunk. | ||
""" | ||
def __init__(self, splunk_setting=None): | ||
""" | ||
""" | ||
|
||
super(SyncData, self).__init__() # remember to call base's init | ||
|
||
assert splunk_setting, ValueError("You need to configure settings of remote target") | ||
assert isinstance(splunk_setting, dict), ValueError("The settings should be dict to include necessary address and confidentials.") | ||
|
||
self.option = splunk_setting | ||
self.protocol = self.option['protocol'] | ||
self.timeout = int(self.option.get('timeout', 120)) | ||
self.sep = self.option.get('sep', "||") | ||
self.host = self.option["host"] | ||
self.port = int(self.option.get('port', 514)) | ||
|
||
# try connection | ||
with SyslogClient(self.host, self.port, proto=self.protocol, timeout=self.timeout) as client: | ||
pass | ||
|
||
def process(self, log_groups, check_point_tracker): | ||
logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True) | ||
logger.info("Get data from shard {0}, log count: {1}".format(self.shard_id, len(logs))) | ||
|
||
try: | ||
with SyslogClient(self.option["host"], 514, proto='tcp', timeout=120) as client: | ||
for log in logs: | ||
# Put your sync code here to send to remote. | ||
# the format of log is just a dict with example as below (Note, all strings are unicode): | ||
# Python2: {u"__time__": u"12312312", u"__topic__": u"topic", u"field1": u"value1", u"field2": u"value2"} | ||
# Python3: {"__time__": "12312312", "__topic__": "topic", "field1": "value1", "field2": "value2"} | ||
event = {} | ||
# suppose we only care about audit log | ||
timestamp = datetime.fromtimestamp(int(log[u'__time__'])) | ||
del log['__time__'] | ||
|
||
io = six.StringIO() | ||
first = True | ||
for k, v in six.iteritems(log): | ||
io.write("{0}{1}={2}".format(self.sep, k, v)) | ||
|
||
data = io.getvalue() | ||
client.log(data, facility=self.option.get("facility", None), severity=self.option.get("severity", None), timestamp=timestamp, program=self.option.get("tag", None), hostname=self.option.get("hostname", None)) | ||
|
||
except Exception as err: | ||
logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(self.option, err)) | ||
# TODO: add some error handling here or retry etc. | ||
raise err | ||
|
||
logger.info("Complete send data to remote") | ||
|
||
self.save_checkpoint(check_point_tracker) | ||
|
||
|
||
def get_monitor_option(): | ||
########################## | ||
# Basic options | ||
########################## | ||
|
||
# load connection info env and consumer group name from envs | ||
endpoint = os.environ.get('SLS_ENDPOINT', '') | ||
accessKeyId = os.environ.get('SLS_AK_ID', '') | ||
accessKey = os.environ.get('SLS_AK_KEY', '') | ||
project = os.environ.get('SLS_PROJECT', '') | ||
logstore = os.environ.get('SLS_LOGSTORE', '') | ||
consumer_group = os.environ.get('SLS_CG', '') | ||
|
||
assert endpoint and accessKeyId and accessKey and project and logstore and consumer_group, \ | ||
ValueError("endpoint/access_id/key/project/logstore/consumer_group/name cannot be empty") | ||
|
||
########################## | ||
# Some advanced options | ||
########################## | ||
|
||
# DON'T configure the consumer name especially when you need to run this program in parallel | ||
consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) | ||
|
||
# This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed. | ||
# Could be "begin", "end", "specific time format in ISO", it's log receiving time. | ||
cursor_start_time = "2019-1-1 0:0:0+8:00" | ||
|
||
# once a client doesn't report to server * heartbeat_interval * 2 interval, server will consider it's offline and re-assign its task to another consumer. | ||
# thus don't set the heatbeat interval too small when the network badwidth or performance of consumtion is not so good. | ||
heartbeat_interval = 20 | ||
|
||
# if the coming data source data is not so frequent, please don't configure it too small (<1s) | ||
data_fetch_interval = 1 | ||
|
||
# create one consumer in the consumer group | ||
option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, | ||
cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, | ||
cursor_start_time=cursor_start_time, | ||
heartbeat_interval=heartbeat_interval, | ||
data_fetch_interval=data_fetch_interval) | ||
|
||
# syslog options | ||
settings = { | ||
"host": "47.75.245.227", # must | ||
"port": 514, # must, port | ||
"protocol": "tcp", # must, tcp or udp | ||
"sep": "||", # must, separator for key=value | ||
"timeout": 120, # optional, default 120 | ||
"facility": syslogclient.FAC_USER, # optional, default None means syslogclient.FAC_USER | ||
"severity": syslogclient.SEV_INFO, # optional, default None means syslogclient.SEV_INFO | ||
"hostname": None, # optional, default hostname of local | ||
"tag": None # optional, tag for the log, default - | ||
} | ||
|
||
return option, settings | ||
|
||
|
||
def main(): | ||
option, settings = get_monitor_option() | ||
|
||
logger.info("*** start to consume data...") | ||
worker = ConsumerWorker(SyncData, option, args=(settings,) ) | ||
worker.start(join=True) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |