Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for adding callables as extra tags and some major improvements #37

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,67 @@ logger = logging.getLogger("my-logger")
logger.addHandler(handler)
logger.error(...)
```

Adding extra callable tags
--------------------------

Having a prior definition of:
```python
import logging
import logging_loki
from multiprocessing import Queue
from myapp.tracing import tracer

get_context = lambda: tracer.active_span.context
add_trace_id = lambda: hex(get_context().trace_id)[
2:] if tracer is not None and tracer.active_span is not None else None
add_span_id = lambda: hex(get_context().span_id)[2:] if tracer is not None and tracer.active_span else None
```

If you want to add extra span IDs or trace IDs do the following:

```python
handler = logging_loki.LokiQueueHandler(
Queue(-1),
url="https://my-loki-instance/loki/api/v1/push",
tags={"application": "my-app", 'span_id': add_span_id, 'trace_id': add_trace_id},
auth=("username", "password"),
version="1"
)
logger = logging.getLogger("my-logger")
logger.addHandler(handler)
logger.error(...)
```

Basically if your callable returns a non-None value, it will be added as a tag. No casting to string will be made.

You can use also the blocking approach of:

```python
handler = logging_loki.LokiHandler(
url="https://my-loki-instance/loki/api/v1/push",
tags={"application": "my-app", "trace_id": add_trace_id, "span_id": add_span_id},
auth=("username", "password"),
version="1",
)

logger = logging.getLogger("my-logger")
logger.addHandler(handler)
logger.error(
"Something happened",
extra={"tags": {"service": "my-service"}},
)
```

Note that Loki version "0" will not support callable tags.


Supplying extra tags
--------------------

If you want to supply extra tags, you can do it twofold:

```python
logger.error('Something happened', extra={'test': 4})
logger.error('Something happened', extra={'tags': {'test': 4}})
```
20 changes: 16 additions & 4 deletions logging_loki/emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
BasicAuth = Optional[Tuple[str, str]]


KEYS_TO_SKIP = {'severity', 'logger', 'msg', 'message', 'tags', 'lineno'}


class LokiEmitter(abc.ABC):
"""Base Loki emitter class."""

Expand All @@ -30,6 +33,17 @@ class LokiEmitter(abc.ABC):
label_replace_with = const.label_replace_with
session_class = requests.Session

@staticmethod
def get_entry_labels(record: logging.LogRecord, line: int) -> dict:
labels = {}
for key, value in record.__dict__.items():
if key in KEYS_TO_SKIP:
continue
if value:
labels[key] = value
labels['line_no'] = line
return labels

def __init__(self, url: str, tags: Optional[dict] = None, auth: BasicAuth = None):
"""
Create new Loki emitter.
Expand Down Expand Up @@ -89,7 +103,6 @@ def format_label(self, label: str) -> str:
def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]:
"""Return tags that must be send to Loki with a log record."""
tags = dict(self.tags) if isinstance(self.tags, ConvertingDict) else self.tags
tags = copy.deepcopy(tags)
tags[self.level_tag] = record.levelname.lower()
tags[self.logger_tag] = record.name

Expand All @@ -99,9 +112,8 @@ def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]:

for tag_name, tag_value in extra_tags.items():
cleared_name = self.format_label(tag_name)
if cleared_name:
if cleared_name and tag_value:
tags[cleared_name] = tag_value

return tags


Expand Down Expand Up @@ -138,6 +150,6 @@ def build_payload(self, record: logging.LogRecord, line) -> dict:
ts = str(int(time.time() * ns))
stream = {
"stream": labels,
"values": [[ts, line]],
"values": [[ts, line, LokiEmitter.get_entry_labels(record, line)]],
}
return {"streams": [stream]}
60 changes: 49 additions & 11 deletions logging_loki/handlers.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,60 @@
# -*- coding: utf-8 -*-

import copy
import logging
import warnings
from logging.handlers import QueueHandler
from logging.handlers import QueueListener
from queue import Queue
from typing import Dict
from typing import Dict, Callable, Any, Union
from typing import Optional
from typing import Type

from logging_loki import const
from logging_loki import emitter


class LokiQueueHandler(QueueHandler):
class TagMixin:
"""
A mixin class to support callable tags.

This is to be inherited from as a first class, eg
>>> class Handler(TagMixin, logging.Handler):
>>> pass
"""

def __init__(self, tags=None):
self.tags = tags or {}

def prepare(self, record):
# This is invoked in the same thread in which logging is invoked
# assume the second class has a proper solution for prepare()
try:
record = self.__class__.__bases__[1].prepare(self, record)
except AttributeError: # logging.Handler has no prepare
pass
record.tags = getattr(record, 'tags', {})
for key, value in (self.tags | record.tags).items():
if callable(value):
value = value()
if value is None:
continue
record.__dict__[key] = value
return record


class LokiQueueHandler(TagMixin, QueueHandler):
"""This handler automatically creates listener and `LokiHandler` to handle logs queue."""

def __init__(self, queue: Queue, **kwargs):
"""Create new logger handler with the specified queue and kwargs for the `LokiHandler`."""
super().__init__(queue)
QueueHandler.__init__(self, queue)
TagMixin.__init__(self, kwargs.get("tags"))
self.handler = LokiHandler(**kwargs) # noqa: WPS110
self.listener = QueueListener(self.queue, self.handler)
self.listener.start()


class LokiHandler(logging.Handler):
class LokiHandler(TagMixin, logging.Handler):
"""
Log handler that sends log records to Loki.

Expand All @@ -39,7 +69,7 @@ class LokiHandler(logging.Handler):
def __init__(
self,
url: str,
tags: Optional[dict] = None,
tags: Optional[Dict[str, Union[Any, Callable]]] = None,
auth: Optional[emitter.BasicAuth] = None,
version: Optional[str] = None,
):
Expand All @@ -53,7 +83,8 @@ def __init__(
version: Version of Loki emitter to use.

"""
super().__init__()
logging.Handler.__init__(self)
TagMixin.__init__(self, tags)

if version is None and const.emitter_ver == "0":
msg = (
Expand All @@ -64,10 +95,16 @@ def __init__(
)
warnings.warn(" ".join(msg), DeprecationWarning)

my_tags = tags or {}

version = version or const.emitter_ver
if version not in self.emitters:
raise ValueError("Unknown emitter version: {0}".format(version))
self.emitter = self.emitters[version](url, tags, auth)
if version == '0' and any(callable(value) for value in my_tags.values()):
raise ValueError('Loki V0 handler does not support callable tags!')

try:
self.emitter = self.emitters[version](url, tags, auth)
except KeyError as exc:
raise ValueError("Unknown emitter version: {0}".format(version)) from exc

def handleError(self, record): # noqa: N802
"""Close emitter and let default handler take actions on error."""
Expand All @@ -76,8 +113,9 @@ def handleError(self, record): # noqa: N802

def emit(self, record: logging.LogRecord):
"""Send log record to Loki."""
record = self.prepare(record)
# noinspection PyBroadException
try:
self.emitter(record, self.format(record))
self.emitter(record, record.lineno)
except Exception:
self.handleError(record)
1 change: 1 addition & 0 deletions tests/test_emitter_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,4 @@ def test_can_build_tags_from_converting_dict(emitter_v1):
logger = logging.getLogger(logger_name)
emitter: LokiEmitterV1 = logger.handlers[0].handler.emitter
emitter.build_tags(create_record())
payload = emitter.build_payload(create_record(), 10)
54 changes: 54 additions & 0 deletions tests/test_real_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging
import time

import logging_loki
from logging_loki.emitter import LokiEmitterV1


def test_callable_tags():
class MyEmitter(LokiEmitterV1):

def build_payload(self, record, line) -> dict:
labels = self.build_tags(record)
ns = 1e9
ts = str(int(time.time() * ns))
stream = {
"stream": labels,
"values": [[ts, line, self.get_entry_labels(record, line)]],
}
return {"streams": [stream]}

def __call__(self, record, line_no):
payload = self.build_payload(record, line_no)
stream = payload['streams'][0]['values'][0][2]
assert 'application' in stream
assert stream['value'] == 5
assert stream['device'] == 'test'
assert stream['levelname'] == 'WARNING'

# Register a mock emitter
logging_loki.LokiHandler.emitters['mock_emitter'] = MyEmitter

handler = logging_loki.LokiHandler(
url="https://example.com/loki/api/v1/push",
tags={"application": "my-app", 'value': lambda: 5},
auth=("username", "password"),
version="mock_emitter"
)
logger = logging.getLogger("my-logger")
logger.addHandler(handler)
logger.warning('Error occurred', extra={'tags': {'device': 'test'}})
logger.warning('Error occurred', extra={'device': 'test'})


def test_not_support_v0():
try:
logging_loki.LokiHandler(
url="https://example.com/loki/api/v1/push",
tags={"application": "my-app", 'value': lambda: 5},
auth=("username", "password"),
version="0")
except ValueError:
pass
else:
assert False, 'V0 supports callable labels'