Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,12 @@ failed_login_log = true
```

A single signal receiver covers both the form login at `/api/v3/auth/login/` and
OPDS HTTP Basic auth — no separate setup per endpoint. Failures still appear in
the main `codex.log`; the dedicated file is an additional copy filtered to just
these records.
OPDS HTTP Basic auth — no separate setup per endpoint. The IP-bearing line is
written **only** to `failed_logins.log`; the main `codex.log` still records
Django's standard `"Unauthorized: /api/v3/auth/login/"` (or `"Forbidden: ..."`)
WARNING for the same request, so the failure is visible in the main log without
the client IP. This keeps PII (IP + username) concentrated in one file that you
can chmod, forward to a SIEM, or retain on its own schedule.

Each line looks like:

Expand Down
18 changes: 17 additions & 1 deletion codex/failed_login_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
etc.) that tail a log and match offending addresses by regex. One record per
failed credential attempt is emitted with ``logger.bind(failed_login=True)``,
which the dedicated loguru sink in :mod:`codex.startup.loguru` filters into a
separate file. The main stdout / codex.log sinks see the record too.
separate file. The main stdout / codex.log sinks apply the inverse filter
(:func:`not_failed_login_filter`) so the IP-bearing line **only** lands in the
dedicated log — Django's own request logger still records the bare
``"Unauthorized: /api/v3/auth/login/"`` at WARNING so the failure is visible
in the main log, just without the client IP. Concentrating IPs in one place
makes the privacy story easier to reason about (one file to chmod, one file
to forward to a SIEM, one file to retain on a different schedule).

The :class:`RequestContextMiddleware` stashes each request in a
:class:`~contextvars.ContextVar` so the signal handler can recover the client
Expand Down Expand Up @@ -51,6 +57,16 @@ def failed_login_filter(record) -> bool:
return bool(record["extra"].get(_FAILED_LOGIN_KEY))


def not_failed_login_filter(record) -> bool:
"""
Loguru sink filter: drop records tagged as failed-login.

Applied to the main stdout / codex.log sinks so IP-bearing lines stay
confined to the dedicated log file.
"""
return not record["extra"].get(_FAILED_LOGIN_KEY)


def on_login_failed(sender, credentials=None, request=None, **_kwargs) -> None:
"""Receiver for ``django.contrib.auth.signals.user_login_failed``."""
del sender
Expand Down
4 changes: 3 additions & 1 deletion codex/settings/codex.toml.default
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
# Only enable if you have authorization in front of Codex. Dangerous.
# remote_user = false
# Log failed login attempts to a separate file. Useful as input for
# banning tools like fail2ban, CrowdSec, or sshguard.
# banning tools like fail2ban, CrowdSec, or sshguard. The IP-bearing line
# is written only to this file; codex.log still records the bare Django
# WARNING (no IP) so the failure is visible without exposing PII.
# Line format: "<ISO timestamp> | Failed login from <ip> user=<username>"
# Example fail2ban failregex:
# ^\s*\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \| Failed login from <HOST> user=.*$
Expand Down
18 changes: 14 additions & 4 deletions codex/startup/loguru.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,28 @@ def loguru_init() -> None:
"level": LOGLEVEL,
}
logger.remove() # Default "sys.stderr" sink is not picklable
logger.add(sys.stdout, **kwargs, colorize=True)

# Privacy: when the failed-login feature is on, the dedicated sink
# receives the IP-bearing lines and the main sinks drop them. When
# off, nothing carries the tag and the inverse filter would be a
# no-op anyway, so don't bother attaching it. Lazy import keeps the
# failed_login_log module out of the graph when the feature is off.
main_filter = None
if AUTH_FAILED_LOGIN_LOG:
from codex.failed_login_log import not_failed_login_filter

main_filter = not_failed_login_filter

logger.add(sys.stdout, **kwargs, colorize=True, filter=main_filter)
logger.add(
LOG_PATH,
**kwargs,
rotation=LOG_ROTATION,
retention=LOG_RETENTION,
compression="xz",
filter=main_filter,
)
if AUTH_FAILED_LOGIN_LOG:
# Lazy import keeps the failed_login_log module out of the import
# graph when the feature is off (the module imports from settings
# at load time, which is fine but avoidable when unused).
from codex.failed_login_log import failed_login_filter

AUTH_FAILED_LOGIN_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
Expand Down
43 changes: 43 additions & 0 deletions tests/test_failed_login_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
_current_request,
failed_login_filter,
get_client_ip,
not_failed_login_filter,
on_login_failed,
)

Expand Down Expand Up @@ -155,3 +156,45 @@ def _inner(_req):
assert result == "response"
assert seen["during"] is request
assert _current_request.get() is None


class SinkFilterTests(TestCase):
"""End-to-end: tagged records only land in sinks with the matching filter."""

@override
def setUp(self) -> None:
"""Attach one sink per filter to capture routing."""
self.dedicated = _CaptureSink() # pyright: ignore[reportUninitializedInstanceVariable]
self.main = _CaptureSink() # pyright: ignore[reportUninitializedInstanceVariable]
self.dedicated_id = logger.add( # pyright: ignore[reportUninitializedInstanceVariable]
self.dedicated,
level="WARNING",
format="{message}",
filter=failed_login_filter,
)
self.main_id = logger.add( # pyright: ignore[reportUninitializedInstanceVariable]
self.main,
level="WARNING",
format="{message}",
filter=not_failed_login_filter,
)

@override
def tearDown(self) -> None:
"""Drop the test sinks."""
logger.remove(self.dedicated_id)
logger.remove(self.main_id)

def test_tagged_record_only_to_dedicated_sink(self) -> None:
"""A logger.bind(failed_login=True) record skips the main sink."""
logger.bind(failed_login=True).warning("Failed login from 1.2.3.4 user=x")
logger.complete()
assert any("1.2.3.4" in line for line in self.dedicated.lines)
assert not any("1.2.3.4" in line for line in self.main.lines)

def test_untagged_record_only_to_main_sink(self) -> None:
"""An ordinary warning never reaches the dedicated sink."""
logger.warning("ordinary warning")
logger.complete()
assert any("ordinary warning" in line for line in self.main.lines)
assert not any("ordinary warning" in line for line in self.dedicated.lines)