Helpers for sending logs to a destination in batches using a buffer in a queue alongside any Python app.
Built for use with Sumo Logic, which allows for sending logs in batches via an HTTP endpoint. Can potentially work with any other similar service.
- Install a recent Python 3.x version (if you don't already have one).
- Create any project (if you don't already have one) - for example a FastAPI based project. It's also recommended, but not required, that the project uses Loguru.
- Install
buffering-queue-logger
as a dependency using Poetry, pip, or similar:poetry add buffering-queue-logger
- Use the utils:
import json from logging import LogRecord from typing import NamedTuple from fastapi import FastAPI from loguru import logger from buffering_queue_logger import get_buffering_queue_logger class LogAggregatorContext(NamedTuple): foo: str moo: str class LogAggregatorKey(NamedTuple): woo: str hoo: str def get_log_aggregator_context() -> LogAggregatorContext: return LogAggregatorContext( foo="foo123", moo="moo123", ) def get_log_aggregator_key_for_record( record: LogRecord, context: LogAggregatorContext | None ) -> LogAggregatorKey: if context is None: raise ValueError( "context is required by get_log_aggregator_key_for_record" ) return LogAggregatorKey( woo=context.foo, hoo=f"{context.moo}/{record.levelname}", ) def get_request_headers( headers: dict[str, Any], key: LogAggregatorKey ) -> dict[str, Any]: new_headers = headers.copy() new_headers["X-Logshmog-Woo"] = key.woo new_headers["X-Logshmog-Hoo"] = key.hoo return new_headers def get_serialized_record(record: Any) -> dict[str, Any]: _record = { "timestamp": f"{record['time']}", "level": f"{record['level']}", "logger": record["name"], "message": record["message"], } if record["extra"].get("foo"): _record["foo"] = record["extra"]["foo"] if record["extra"].get("moo"): _record["moo"] = record["extra"]["moo"] return _record def log_formatter(record: Any) -> str: record["extra"]["serialized"] = json.dumps( get_serialized_record(record) ) return "{extra[serialized]}\n" async def start_logshmog_flush_buffer_timer(): await start_flush_buffer_timer(10) def configure_buffering_queue_logger(context: LogAggregatorContext): handler, listener = get_buffering_queue_logger( capacity=1000, url="https://foo.logshmog.com/v1/logs/a1b2c3", get_log_aggregator_key_func=get_aggregator_key_for_record, get_request_headers_func=get_request_headers, chunk_size=100, flush_buffer_every_x_secs=10, context=context, ) listener.start() logger.add(handler, format=log_formatter) def configure_logger(): context = get_log_aggregator_context() logger.configure(extra=context._asdict()) # Config for other log handlers may go here ... configure_buffering_queue_logger(context) def get_app() -> FastAPI: """Create a FastAPI app instance.""" return FastAPI() configure_logger() app = get_app()
To clone the repo:
git clone git@github.com:Jaza/buffering-queue-logger.git
To automatically fix code style issues:
./scripts/format.sh
To verify code style and static typing:
./scripts/verify.sh
To run tests:
./scripts/test.sh
To build the library:
poetry build
Built by Seertech.