Skip to content

Commit

Permalink
Add support for adding callables as extra tags
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrmaslanka committed Mar 21, 2024
1 parent 1dc706a commit cd1df25
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 15 deletions.
64 changes: 64 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,67 @@ logger = logging.getLogger("my-logger")
logger.addHandler(handler)
logger.error(...)
```

Adding extra callable tags
--------------------------

Having a prior definition of:
```python
import logging
import logging_loki
from multiprocessing import Queue
from myapp.tracing import tracer

get_context = lambda: tracer.active_span.context
add_trace_id = lambda: hex(get_context().trace_id)[
2:] if tracer is not None and tracer.active_span is not None else None
add_span_id = lambda: hex(get_context().span_id)[2:] if tracer is not None and tracer.active_span else None
```

If you want to add extra span IDs or trace IDs do the following:

```python
handler = logging_loki.LokiQueueHandler(
Queue(-1),
url="https://my-loki-instance/loki/api/v1/push",
tags={"application": "my-app", 'span_id': add_span_id, 'trace_id': add_trace_id},
auth=("username", "password"),
version="1"
)
logger = logging.getLogger("my-logger")
logger.addHandler(handler)
logger.error(...)
```

Basically if your callable returns a non-None value, it will be added as a tag. No casting to string will be made.

You can use also the blocking approach of:

```python
handler = logging_loki.LokiHandler(
url="https://my-loki-instance/loki/api/v1/push",
tags={"application": "my-app", "trace_id": add_trace_id, "span_id": add_span_id},
auth=("username", "password"),
version="1",
)

logger = logging.getLogger("my-logger")
logger.addHandler(handler)
logger.error(
"Something happened",
extra={"tags": {"service": "my-service"}},
)
```

Note that Loki version "0" will not support callable tags.


Supplying extra tags
--------------------

If you want to supply extra tags, you can do it twofold:

```python
logger.error('Something happened', extra={'test': 4})
logger.error('Something happened', extra={'tags': {'test': 4}})
```
20 changes: 16 additions & 4 deletions logging_loki/emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
BasicAuth = Optional[Tuple[str, str]]


KEYS_TO_SKIP = {'severity', 'logger', 'msg', 'message', 'tags', 'lineno'}


class LokiEmitter(abc.ABC):
"""Base Loki emitter class."""

Expand All @@ -30,6 +33,17 @@ class LokiEmitter(abc.ABC):
label_replace_with = const.label_replace_with
session_class = requests.Session

@staticmethod
def get_entry_labels(record: logging.LogRecord, line: int) -> dict:
labels = {}
for key, value in record.__dict__.items():
if key in KEYS_TO_SKIP:
continue
if value:
labels[key] = value
labels['line_no'] = line
return labels

def __init__(self, url: str, tags: Optional[dict] = None, auth: BasicAuth = None):
"""
Create new Loki emitter.
Expand Down Expand Up @@ -89,7 +103,6 @@ def format_label(self, label: str) -> str:
def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]:
"""Return tags that must be send to Loki with a log record."""
tags = dict(self.tags) if isinstance(self.tags, ConvertingDict) else self.tags
tags = copy.deepcopy(tags)
tags[self.level_tag] = record.levelname.lower()
tags[self.logger_tag] = record.name

Expand All @@ -99,9 +112,8 @@ def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]:

for tag_name, tag_value in extra_tags.items():
cleared_name = self.format_label(tag_name)
if cleared_name:
if cleared_name and tag_value:
tags[cleared_name] = tag_value

return tags


Expand Down Expand Up @@ -138,6 +150,6 @@ def build_payload(self, record: logging.LogRecord, line) -> dict:
ts = str(int(time.time() * ns))
stream = {
"stream": labels,
"values": [[ts, line]],
"values": [[ts, line, LokiEmitter.get_entry_labels(record, line)]],
}
return {"streams": [stream]}
60 changes: 49 additions & 11 deletions logging_loki/handlers.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,60 @@
# -*- coding: utf-8 -*-

import copy
import logging
import warnings
from logging.handlers import QueueHandler
from logging.handlers import QueueListener
from queue import Queue
from typing import Dict
from typing import Dict, Callable, Any, Union
from typing import Optional
from typing import Type

from logging_loki import const
from logging_loki import emitter


class LokiQueueHandler(QueueHandler):
class TagMixin:
"""
A mixin class to support callable tags.
This is to be inherited from as a first class, eg
>>> class Handler(TagMixin, logging.Handler):
>>> pass
"""

def __init__(self, tags=None):
self.tags = tags or {}

def prepare(self, record):
# This is invoked in the same thread in which logging is invoked
# assume the second class has a proper solution for prepare()
try:
record = self.__class__.__bases__[1].prepare(self, record)
except AttributeError: # logging.Handler has no prepare
pass
record.tags = getattr(record, 'tags', {})
for key, value in (self.tags | record.tags).items():
if callable(value):
value = value()
if value is None:
continue
record.__dict__[key] = value
return record


class LokiQueueHandler(TagMixin, QueueHandler):
"""This handler automatically creates listener and `LokiHandler` to handle logs queue."""

def __init__(self, queue: Queue, **kwargs):
"""Create new logger handler with the specified queue and kwargs for the `LokiHandler`."""
super().__init__(queue)
QueueHandler.__init__(self, queue)
TagMixin.__init__(self, kwargs.get("tags"))
self.handler = LokiHandler(**kwargs) # noqa: WPS110
self.listener = QueueListener(self.queue, self.handler)
self.listener.start()


class LokiHandler(logging.Handler):
class LokiHandler(TagMixin, logging.Handler):
"""
Log handler that sends log records to Loki.
Expand All @@ -39,7 +69,7 @@ class LokiHandler(logging.Handler):
def __init__(
self,
url: str,
tags: Optional[dict] = None,
tags: Optional[Dict[str, Union[Any, Callable]]] = None,
auth: Optional[emitter.BasicAuth] = None,
version: Optional[str] = None,
):
Expand All @@ -53,7 +83,8 @@ def __init__(
version: Version of Loki emitter to use.
"""
super().__init__()
logging.Handler.__init__(self)
TagMixin.__init__(self, tags)

if version is None and const.emitter_ver == "0":
msg = (
Expand All @@ -64,10 +95,16 @@ def __init__(
)
warnings.warn(" ".join(msg), DeprecationWarning)

my_tags = tags or {}

version = version or const.emitter_ver
if version not in self.emitters:
raise ValueError("Unknown emitter version: {0}".format(version))
self.emitter = self.emitters[version](url, tags, auth)
if version == '0' and any(callable(value) for value in my_tags.values()):
raise ValueError('Loki V0 handler does not support callable tags!')

try:
self.emitter = self.emitters[version](url, tags, auth)
except KeyError as exc:
raise ValueError("Unknown emitter version: {0}".format(version)) from exc

def handleError(self, record): # noqa: N802
"""Close emitter and let default handler take actions on error."""
Expand All @@ -76,8 +113,9 @@ def handleError(self, record): # noqa: N802

def emit(self, record: logging.LogRecord):
"""Send log record to Loki."""
record = self.prepare(record)
# noinspection PyBroadException
try:
self.emitter(record, self.format(record))
self.emitter(record, record.lineno)
except Exception:
self.handleError(record)
1 change: 1 addition & 0 deletions tests/test_emitter_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,4 @@ def test_can_build_tags_from_converting_dict(emitter_v1):
logger = logging.getLogger(logger_name)
emitter: LokiEmitterV1 = logger.handlers[0].handler.emitter
emitter.build_tags(create_record())
payload = emitter.build_payload(create_record(), 10)
54 changes: 54 additions & 0 deletions tests/test_real_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging
import time

import logging_loki
from logging_loki.emitter import LokiEmitterV1


def test_callable_tags():
class MyEmitter(LokiEmitterV1):

def build_payload(self, record, line) -> dict:
labels = self.build_tags(record)
ns = 1e9
ts = str(int(time.time() * ns))
stream = {
"stream": labels,
"values": [[ts, line, self.get_entry_labels(record, line)]],
}
return {"streams": [stream]}

def __call__(self, record, line_no):
payload = self.build_payload(record, line_no)
stream = payload['streams'][0]['values'][0][2]
assert 'application' in stream
assert stream['value'] == 5
assert stream['device'] == 'test'
assert stream['levelname'] == 'WARNING'

# Register a mock emitter
logging_loki.LokiHandler.emitters['mock_emitter'] = MyEmitter

handler = logging_loki.LokiHandler(
url="https://example.com/loki/api/v1/push",
tags={"application": "my-app", 'value': lambda: 5},
auth=("username", "password"),
version="mock_emitter"
)
logger = logging.getLogger("my-logger")
logger.addHandler(handler)
logger.warning('Error occurred', extra={'tags': {'device': 'test'}})
logger.warning('Error occurred', extra={'device': 'test'})


def test_not_support_v0():
try:
logging_loki.LokiHandler(
url="https://example.com/loki/api/v1/push",
tags={"application": "my-app", 'value': lambda: 5},
auth=("username", "password"),
version="0")
except ValueError:
pass
else:
assert False, 'V0 supports callable labels'

0 comments on commit cd1df25

Please sign in to comment.