Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ modules = [
[rate_limit]
# Maximum requests per minute per client IP (default: 60)
# rpm = 60

[logging]
# Log level: DEBUG, INFO, WARNING, ERROR, CRITICAL (default: INFO)
# level = "INFO"
# Log format: "plain" (human-readable) or "json" (structured, one record per line)
# format = "plain"
14 changes: 14 additions & 0 deletions src/deckhand/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def __init__(self) -> None:
self.config_file_path: str | None = None
self.state_file_path: str | None = None
self.rate_limit_rpm: int = 60
self.log_level: str = "INFO"
self.log_format: str = "plain" # "plain" or "json"

# Auth: list of {key, scope} dicts
self._raw_api_keys: list[dict[str, str]] = []
Expand Down Expand Up @@ -90,6 +92,12 @@ def _load_from_config_file(self, file_path: str) -> None:
rl_config = config["rate_limit"]
self.rate_limit_rpm = rl_config.get("rpm", self.rate_limit_rpm)

# Logging
if "logging" in config:
log_config = config["logging"]
self.log_level = log_config.get("level", self.log_level)
self.log_format = log_config.get("format", self.log_format)

def _load_auth(self, auth_config: dict[str, Any]) -> None:
"""Parse the [auth] section."""
if "api_keys" in auth_config:
Expand Down Expand Up @@ -130,3 +138,9 @@ def _load_from_env(self) -> None:
self.rate_limit_rpm = int(rpm_str)
except ValueError:
pass

if log_level := os.getenv("DECKHAND_LOG_LEVEL"):
self.log_level = log_level

if log_format := os.getenv("DECKHAND_LOG_FORMAT"):
self.log_format = log_format
91 changes: 91 additions & 0 deletions src/deckhand/logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Logging configuration: configurable level + plain/JSON output formats."""

from __future__ import annotations

import json
import logging
import sys
from datetime import datetime, timezone
from typing import Any

# Standard LogRecord attributes that should not be included in the "extra" dict
# of a JSON record. Anything not in this set was added via logger.xxx(..., extra=...).
_RESERVED_ATTRS = {
"name",
"msg",
"args",
"levelname",
"levelno",
"pathname",
"filename",
"module",
"exc_info",
"exc_text",
"stack_info",
"lineno",
"funcName",
"created",
"msecs",
"relativeCreated",
"thread",
"threadName",
"processName",
"process",
"message",
"asctime",
"taskName",
}


class JsonFormatter(logging.Formatter):
"""Format log records as a single line of JSON.

Includes any keyword arguments passed to the logger via ``extra=`` so
callers can attach contextual fields like ``agent_id`` or ``action_name``.
"""

def format(self, record: logging.LogRecord) -> str:
payload: dict[str, Any] = {
"timestamp": datetime.fromtimestamp(
record.created, tz=timezone.utc
).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}

# Attach contextual fields supplied via `extra=`
for key, value in record.__dict__.items():
if key in _RESERVED_ATTRS or key.startswith("_"):
continue
payload[key] = value

if record.exc_info:
payload["exc_info"] = self.formatException(record.exc_info)

return json.dumps(payload, default=str)


def configure_logging(level: str = "INFO", fmt: str = "plain") -> None:
"""Configure the root logger with the requested level and format.

Safe to call multiple times — replaces existing handlers on the root logger.
"""
root = logging.getLogger()
root.setLevel(level.upper())

# Remove pre-existing handlers so re-configuration is idempotent
for handler in list(root.handlers):
root.removeHandler(handler)

handler = logging.StreamHandler(sys.stderr)
if fmt == "json":
handler.setFormatter(JsonFormatter())
else:
handler.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)-8s %(name)s: %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
)
)
root.addHandler(handler)
25 changes: 23 additions & 2 deletions src/deckhand/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from deckhand.agents.mock import MockAgent
from deckhand.config.settings import Settings
from deckhand.logging_config import configure_logging
from deckhand.orchestrator.actions import ActionRegistry
from deckhand.orchestrator.events import build_error_event, build_event
from deckhand.orchestrator.manager import Orchestrator
Expand Down Expand Up @@ -57,9 +58,10 @@ async def lifespan(app: FastAPI):
settings, \
rate_limiter

# Startup
logger.info("Starting Deckhand service...")
# Startup — load settings first so we can configure logging from them
settings = Settings()
configure_logging(level=settings.log_level, fmt=settings.log_format)
logger.info("Starting Deckhand service...")

# Log configuration
logger.info("Configuration:")
Expand Down Expand Up @@ -142,6 +144,10 @@ async def dispatch(self, request: Request, call_next):
if rate_limiter is not None:
client_ip = request.client.host if request.client else "unknown"
if not rate_limiter.check(client_ip):
logger.warning(
"Rate limit exceeded",
extra={"client_ip": client_ip, "path": request.url.path},
)
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded"},
Expand Down Expand Up @@ -170,15 +176,24 @@ def _require_scope(request: Request, scope: str) -> ApiKeyEntry:
if settings is None:
raise HTTPException(status_code=503, detail="Service not initialized")

client_ip = request.client.host if request.client else "unknown"
log_ctx = {"client_ip": client_ip, "path": request.url.path, "scope": scope}

token = _extract_token(request)
if not token:
logger.warning("Auth failed: missing API key", extra=log_ctx)
raise HTTPException(status_code=401, detail="Missing API key")

entry = resolve_key(token, settings.api_keys)
if entry is None:
logger.warning("Auth failed: invalid API key", extra=log_ctx)
raise HTTPException(status_code=401, detail="Invalid API key")

if not has_scope(entry, scope):
logger.warning(
"Auth failed: insufficient scope",
extra={**log_ctx, "key_scope": entry.scope},
)
raise HTTPException(
status_code=403, detail=f"Insufficient scope: requires '{scope}'"
)
Expand Down Expand Up @@ -574,12 +589,15 @@ async def events(websocket: WebSocket) -> None:

# Accept the connection, then authenticate via first message
await websocket.accept()
client_ip = websocket.client.host if websocket.client else "unknown"
ws_ctx = {"client_ip": client_ip, "path": "/events"}

try:
raw = await asyncio.wait_for(websocket.receive_text(), timeout=_WS_AUTH_TIMEOUT)
auth_msg = json.loads(raw)

if auth_msg.get("type") != "auth" or "token" not in auth_msg:
logger.warning("WS auth failed: malformed auth message", extra=ws_ctx)
await websocket.send_json(
{
"type": "auth_error",
Expand All @@ -591,6 +609,7 @@ async def events(websocket: WebSocket) -> None:

entry = resolve_key(auth_msg["token"], settings.api_keys)
if entry is None:
logger.warning("WS auth failed: invalid API key", extra=ws_ctx)
await websocket.send_json(
{"type": "auth_error", "detail": "Invalid API key"}
)
Expand All @@ -600,9 +619,11 @@ async def events(websocket: WebSocket) -> None:
await websocket.send_json({"type": "auth_ok", "scope": entry.scope})

except asyncio.TimeoutError:
logger.warning("WS auth failed: handshake timed out", extra=ws_ctx)
await websocket.close(code=4001, reason="Auth handshake timed out")
return
except (json.JSONDecodeError, KeyError):
logger.warning("WS auth failed: malformed auth message", extra=ws_ctx)
await websocket.close(code=4001, reason="Malformed auth message")
return

Expand Down
55 changes: 55 additions & 0 deletions tests/test_logging_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Tests for structured logging configuration."""

from __future__ import annotations

import json
import logging

from deckhand.logging_config import JsonFormatter, configure_logging


def _make_record(**kwargs) -> logging.LogRecord:
record = logging.LogRecord(
name="deckhand.test",
level=logging.INFO,
pathname=__file__,
lineno=10,
msg="hello %s",
args=("world",),
exc_info=None,
)
for key, value in kwargs.items():
setattr(record, key, value)
return record


def test_json_formatter_emits_required_fields() -> None:
record = _make_record()
out = json.loads(JsonFormatter().format(record))
assert out["level"] == "INFO"
assert out["logger"] == "deckhand.test"
assert out["message"] == "hello world"
assert "timestamp" in out


def test_json_formatter_includes_extra_context() -> None:
record = _make_record(agent_id="mock-1", action_name="run", client_ip="127.0.0.1")
out = json.loads(JsonFormatter().format(record))
assert out["agent_id"] == "mock-1"
assert out["action_name"] == "run"
assert out["client_ip"] == "127.0.0.1"


def test_configure_logging_is_idempotent_and_sets_level() -> None:
configure_logging(level="DEBUG", fmt="json")
root = logging.getLogger()
assert root.level == logging.DEBUG
handlers_before = len(root.handlers)
assert handlers_before == 1
assert isinstance(root.handlers[0].formatter, JsonFormatter)

# Re-configuring replaces handlers, doesn't accumulate them
configure_logging(level="WARNING", fmt="plain")
assert len(root.handlers) == 1
assert root.level == logging.WARNING
assert not isinstance(root.handlers[0].formatter, JsonFormatter)