Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

old_scripts/

# Agent session files
session-*.md

# Overwrite global config
.global_config.yaml

Expand Down
19 changes: 18 additions & 1 deletion common/config_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
type validation and structure for the configuration data.
"""

from pydantic import BaseModel
from pydantic import BaseModel, Field


class ExampleParent(BaseModel):
Expand Down Expand Up @@ -70,12 +70,29 @@ class LoggingLevelsConfig(BaseModel):
critical: bool


class RedactionPattern(BaseModel):
"""Configuration for a specific redaction pattern."""

name: str
regex: str
placeholder: str


class RedactionConfig(BaseModel):
"""Configuration for log redaction/scrubbing."""

enabled: bool = True
use_default_pii: bool = True
patterns: list[RedactionPattern] = []


class LoggingConfig(BaseModel):
"""Complete logging configuration."""

verbose: bool
format: LoggingFormatConfig
levels: LoggingLevelsConfig
redaction: RedactionConfig = Field(default_factory=lambda: RedactionConfig())


class FeaturesConfig(BaseModel):
Expand Down
21 changes: 21 additions & 0 deletions common/global_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,24 @@ logging:
warning: true # Show warning logs
error: true # Show error logs
critical: true # Show critical logs
redaction:
enabled: true
use_default_pii: true
patterns:
- name: "ANTHROPIC_API_KEY"
regex: "sk-ant-[a-zA-Z0-9-]{20,}"
placeholder: "[REDACTED_API_KEY]"
- name: "OPENAI_API_KEY"
regex: "sk-[a-zA-Z0-9]{20,}"
placeholder: "[REDACTED_API_KEY]"
- name: "STRIPE_API_KEY"
regex: "[spr]k_(live|test)_[a-zA-Z0-9]{20,}"
placeholder: "[REDACTED_API_KEY]"
- name: "BEARER_TOKEN"
regex: "Bearer\\s+[a-zA-Z0-9._\\-]{20,}"
placeholder: "[REDACTED_BEARER_TOKEN]"
- name: "GENERIC_KEY"
regex: "(?i:(?:api[_-]?key|project[_-]?key|secret[_-]?key)[=:\\s]+['\"]?[a-zA-Z0-9_\\-]{16,}['\"]?)"
placeholder: "[REDACTED_KEY]"


6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ dependencies = [
"pylint>=3.3.0",
"deptry>=0.24.0",
"openfeature-sdk>=0.8.4",
"scrubadub>=2.0.1",
"pydantic>=2.0.0",
"numpy>=1.26.0",
]
readme = "README.md"
requires-python = ">= 3.12"
Expand Down Expand Up @@ -65,6 +68,9 @@ error-on-warning = true
[tool.ty.environment]
python-version = "3.12"

[tool.deptry]
ignore = ["DEP002"]

[tool.vulture]
exclude = [
".venv/",
Expand Down
103 changes: 72 additions & 31 deletions src/utils/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import threading

import scrubadub
from human_id import generate_id
from loguru import logger

Expand All @@ -13,57 +14,90 @@
_logging_initialized = False
_logging_lock = threading.Lock()

# PII Patterns for redaction (pre-compiled for performance)
# Note: More specific patterns must come before general ones (e.g., sk-ant- before sk-)
_COMPILED_PII_PATTERNS = [
# Email addresses
(
re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b"),
"[REDACTED_EMAIL]",
),
# Anthropic API keys (sk-ant-...) - must be before OpenAI pattern
(re.compile(r"sk-ant-[a-zA-Z0-9-]{20,}"), "[REDACTED_API_KEY]"),
# OpenAI API keys (sk-...)
(re.compile(r"sk-[a-zA-Z0-9]{20,}"), "[REDACTED_API_KEY]"),
# Stripe API keys (sk_live_*, sk_test_*, pk_live_*, pk_test_*, rk_live_*, rk_test_*)
(re.compile(r"[spr]k_(live|test)_[a-zA-Z0-9]{20,}"), "[REDACTED_API_KEY]"),
# Authorization Bearer tokens
(re.compile(r"Bearer\s+[a-zA-Z0-9._\-]{20,}"), "[REDACTED_BEARER_TOKEN]"),
# Generic project/API keys (common formats: xxx_key_*, api_key=*, apikey=*)
(re.compile(r"(?i)(api[_-]?key|project[_-]?key|secret[_-]?key)[=:\s]+['\"]?[a-zA-Z0-9_\-]{16,}['\"]?"), "[REDACTED_KEY]"),
]

class _LogScrubber:
"""
Optimized single-pass log scrubber.
Uses scrubadub for general PII and a compiled multi-pattern regex for secrets.
"""

def __init__(self):
config = global_config.logging.redaction
self.enabled = config.enabled
self.use_default_pii = config.use_default_pii
self.patterns = config.patterns

# Initialize scrubadub
self.scrubber = None
if self.enabled and self.use_default_pii:
self.scrubber = scrubadub.Scrubber()
# Remove default FilenameDetector if it's too aggressive, but usually it's fine
# We can customize detectors here if needed

# Compile custom patterns into a single-pass regex
self.combined_regex = None
self.placeholder_map = {}

if self.enabled and self.patterns:
regex_parts = []
for i, p in enumerate(self.patterns):
group_name = f"p{i}"
regex_parts.append(f"(?P<{group_name}>{p.regex})")
self.placeholder_map[group_name] = p.placeholder

self.combined_regex = re.compile("|".join(regex_parts))

def _redact_callback(self, match):
"""Callback for re.sub to return the correct placeholder for the matched group."""
group_name = match.lastgroup
return self.placeholder_map.get(group_name, "[REDACTED]")

def scrub(self, text: str) -> str:
"""Scrub sensitive data from text in a single pass."""
if not self.enabled or not text:
return text

# 1. Scrub general PII using scrubadub
if self.scrubber:
text = self.scrubber.clean(text)

# 2. Scrub custom secrets (single pass)
if self.combined_regex:
text = self.combined_regex.sub(self._redact_callback, text)

return text


# Initialize the singleton scrubber
_SCRUBBER = _LogScrubber()


def scrub_sensitive_data(record):
"""
Patch function to scrub sensitive data from the log record.
Modifies record["message"] and record["exception"] in place.
"""
if not _SCRUBBER.enabled:
return

# Scrub main message
message = record["message"]
for pattern, placeholder in _COMPILED_PII_PATTERNS:
message = pattern.sub(placeholder, message)
record["message"] = message
record["message"] = _SCRUBBER.scrub(record["message"])

# Scrub exception if present
exception = record.get("exception")
if exception:
type_, value, tb = exception
value_str = str(value)
redacted = False
for pattern, placeholder in _COMPILED_PII_PATTERNS:
if pattern.search(value_str):
value_str = pattern.sub(placeholder, value_str)
redacted = True
scrubbed_value_str = _SCRUBBER.scrub(value_str)

if redacted:
if scrubbed_value_str != value_str:
# Re-instantiate the exception with the redacted message to preserve loguru formatting
try:
# Most standard exceptions accept a single string argument
new_value = type_(value_str)
new_value = type_(scrubbed_value_str)
except Exception:
# Fallback to a generic Exception if type instantiation fails
new_value = Exception(value_str)
new_value = Exception(scrubbed_value_str)

# Preserve traceback and context metadata
new_value.__traceback__ = tb
Expand All @@ -72,6 +106,13 @@ def scrub_sensitive_data(record):

record["exception"] = (type_, new_value, tb)

# Scrub extra context if present
extra = record.get("extra")
if extra:
for key, val in extra.items():
if isinstance(val, str):
extra[key] = _SCRUBBER.scrub(val)


def _should_show_location(level: str) -> bool:
"""Determine if location should be shown for given log level"""
Expand Down
13 changes: 10 additions & 3 deletions tests/test_logging_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ def test_email_redaction(self):
record = {"message": "User email is test@example.com", "exception": None}
scrub_sensitive_data(record)
assert "test@example.com" not in record["message"]
assert "[REDACTED_EMAIL]" in record["message"]
assert "{{EMAIL}}" in record["message"]

def test_phone_redaction(self):
"""Test that phone numbers are redacted (new capability via scrubadub)."""
record = {"message": "Call me at 1-800-555-0199", "exception": None}
scrub_sensitive_data(record)
assert "1-800-555-0199" not in record["message"]
assert "{{PHONE}}" in record["message"]

def test_api_key_redaction(self):
"""Test that OpenAI API keys are redacted from log messages."""
Expand All @@ -32,7 +39,7 @@ def test_multiple_redactions(self):
"exception": None,
}
scrub_sensitive_data(record)
assert "[REDACTED_EMAIL]" in record["message"]
assert "{{EMAIL}}" in record["message"]
assert "[REDACTED_API_KEY]" in record["message"]
assert "test@example.com" not in record["message"]
assert "sk-123456789012345678901234" not in record["message"]
Expand All @@ -54,7 +61,7 @@ def test_exception_message_redaction(self):
# Verify exception redaction
_, value, _ = record["exception"]
assert "test@example.com" not in str(value)
assert "[REDACTED_EMAIL]" in str(value)
assert "{{EMAIL}}" in str(value)

def test_exception_api_key_redaction(self):
"""Test redacting API keys from exception values."""
Expand Down
Loading
Loading