Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Logstash Handler #11

Merged
merged 12 commits into from Dec 14, 2023
2 changes: 1 addition & 1 deletion CHANGELOG.md
Expand Up @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

*
* Add new `LogstashHandler` to the logging system

### Changed

Expand Down
2 changes: 1 addition & 1 deletion src/colony/base/__init__.py
Expand Up @@ -53,7 +53,7 @@
PluginClassNotAvailable, InvalidCommand, InvalidArgument, SecurityError, OperationNotComplete, OperationRestart
from .information import VERSION, RELEASE, BUILD, RELEASE_DATE, RELEASE_DATE_TIME, ENVIRONMENT_VERSION,\
ENVIRONMENT, DEFAULT_ENCODING, DATE_FORMAT, DATE_TIME_FORMAT, INFORMATION_PATH
from .loggers import BroadcastHandler, MemoryHandler
from .loggers import BroadcastHandler, MemoryHandler, LogstashHandler
from .system import System, Plugin, PluginManagerPlugin, PluginManager, Dependency, PluginDependency,\
PackageDependency, Condition, OperativeSystemCondition, Capability, Event, PluginThread,\
PluginEventThread
Expand Down
72 changes: 72 additions & 0 deletions src/colony/base/loggers.py
Expand Up @@ -37,10 +37,15 @@
__license__ = "Apache License, Version 2.0"
""" The license for the module """

import os
import socket
import logging
import itertools
import datetime
import threading
import collections

from . import config
from . import legacy

try:
Expand Down Expand Up @@ -242,3 +247,70 @@ def flush_to_file(
finally:
if is_path: file.close()
if clear: self.clear()

class LogstashHandler(logging.Handler):

def __init__(self, level = logging.NOTSET, max_length = MAX_LENGTH, api = None):
logging.Handler.__init__(self, level = level)
if not api: api = self._build_api()
self.messages = collections.deque()
self.max_length = max_length
self.api = api

def emit(self, record):
# verifies if the API structure is defined and set and if
# that's not the case returns immediately
if not self.api: return

# retrieves the current date time value as an utc value
# and then formats it according to the provided format string
message = self.format(record)

# creates the log record structure that is going to be sent
# to the logstash infra-structure, this should represent a
# proper structure ready to be debugged
now = datetime.datetime.utcnow()
now_s = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

log = {
"@timestamp" : now_s,
"message_fmt" : message,
"logger" : record.name,
"message" : record.message,
"level" : record.levelname,
"path" : record.pathname,
"lineno" : record.lineno,
"host" : socket.gethostname(),
"hostname" : socket.gethostname(),
"tid" : threading.current_thread().ident,
"pid" : os.getpid() if hasattr(os, "getpid") else -1,
}

self.messages.append(log)
should_flush = len(self.messages) >= self.max_length
if should_flush: self.flush()

def flush(self, force = False):
logging.Handler.flush(self)

# verifies if the API structure is defined and set and if
# that's not the case returns immediately
if not self.api: return

# in case the force flag is not set and there are no messages
# to be flushed returns immediately (nothing to be done)
messages = self.messages
if not messages and not force: return

# posts the complete set of messages to logstash and then clears the messages
self.api.log_bulk(messages, tag = "default")
self.messages = []

def _build_api():
try: import logstash
except ImportError: return None

if not config.conf("LOGGING_LOGSTASH", False, cast = bool):
return None

return logstash.API()
9 changes: 9 additions & 0 deletions src/colony/base/system.py
Expand Up @@ -2048,6 +2048,12 @@ def start_logger(self, log_level = DEFAULT_LOGGING_LEVEL):
memory_handler = loggers.MemoryHandler()
memory_handler.setLevel(minimal_log_level)

# creates the logstash handler object and then sets the
# minimal log level in it so that it may have the maximum
# amount of information available for handling
logstash_handler = loggers.LogstashHandler()
logstash_handler.setLevel(minimal_log_level)

# retrieves the logging format and uses it
# to create the proper logging formatter
logging_format = GLOBAL_CONFIG.get(
Expand All @@ -2063,6 +2069,7 @@ def start_logger(self, log_level = DEFAULT_LOGGING_LEVEL):
rotating_err_file_handler.setFormatter(formatter)
broadcast_handler.setFormatter(formatter)
memory_handler.setFormatter(formatter)
logstash_handler.setFormatter(formatter)

# adds the complete set of logging handler to the
# current logger, so that they get notified once
Expand All @@ -2072,6 +2079,7 @@ def start_logger(self, log_level = DEFAULT_LOGGING_LEVEL):
logger.addHandler(rotating_err_file_handler)
logger.addHandler(broadcast_handler)
logger.addHandler(memory_handler)
logger.addHandler(logstash_handler)

# sets the logger in the current context, so that
# it may be used latter for reference
Expand All @@ -2085,6 +2093,7 @@ def start_logger(self, log_level = DEFAULT_LOGGING_LEVEL):
self.logger_handlers["rotating_err_file"] = rotating_err_file_handler
self.logger_handlers["broadcast"] = broadcast_handler
self.logger_handlers["memory"] = memory_handler
self.logger_handlers["logstash"] = logstash_handler

def load_system(self, mode = None, args = None, callback = None):
"""
Expand Down
41 changes: 41 additions & 0 deletions src/colony/test/base/loggers.py
Expand Up @@ -41,6 +41,9 @@

import colony

try: import unittest.mock as mock
except ImportError: mock = None

class LoggersTest(colony.ColonyTestCase):
"""
Test case for the verification of logging related
Expand Down Expand Up @@ -144,3 +147,41 @@ def test_memory_handler_file(self):

latest = memory_handler.get_latest(count = 1)
self.assertEqual(len(latest), 0)

def test_logstash_handler(self):
if mock == None:
self.skipTest("Skipping test: mock unavailable")

mock_api_client = mock.Mock()
hugo-gomes marked this conversation as resolved.
Show resolved Hide resolved
mock_api_client.log_bulk.return_value = None

logstash_handler = colony.LogstashHandler(api = mock_api_client)
formatter = logging.Formatter("%(message)s")
logstash_handler.setFormatter(formatter)

self.assertEqual(len(logstash_handler.messages), 0)

record = logging.makeLogRecord(
dict(
msg = "hello world",
levelname = logging.getLevelName(logging.INFO)
)
)
logstash_handler.emit(record)
self.assertEqual(len(logstash_handler.messages), 1)
self.assertEqual(logstash_handler.messages[0]["message"], "hello world")
self.assertEqual(logstash_handler.messages[0]["level"], "INFO")

logstash_handler.flush()
self.assertEqual(len(logstash_handler.messages), 0)

record = logging.makeLogRecord(
dict(
msg = "hello world 2",
levelname = logging.getLevelName(logging.INFO)
)
)
logstash_handler.emit(record)
self.assertEqual(len(logstash_handler.messages), 1)
self.assertEqual(logstash_handler.messages[0]["message"], "hello world 2")
self.assertEqual(logstash_handler.messages[0]["level"], "INFO")