Skip to content
arminrad edited this page Mar 16, 2026 · 2 revisions

Audit System

Security audit logging for compliance and forensics


Overview

Track security-sensitive events:

  • Authentication events - Login, logout, failures
  • Authorization changes - Permission updates
  • Data access - Sensitive data queries
  • Configuration changes - System settings
  • Admin actions - All admin operations

Compliance: SOC 2, HIPAA, GDPR, PCI-DSS


Audit Events

Authentication

  • Login success/failure
  • Password changes
  • MFA events
  • Session creation/destruction
  • Token generation

Authorization

  • Permission granted/revoked
  • Role assignments
  • Access denied events
  • Privilege escalation attempts

Data Access

  • User data queries
  • API key access
  • Payment information access
  • Export operations

Admin Actions

  • User creation/deletion
  • Credit adjustments
  • Rate limit changes
  • Feature flag toggles
  • System configuration

Database Schema

audit_logs Table

CREATE TABLE audit_logs (
  id BIGINT PRIMARY KEY,
  timestamp TIMESTAMP NOT NULL,
  user_id INTEGER,
  admin_id INTEGER,
  event_type TEXT NOT NULL,
  event_category TEXT NOT NULL,
  resource_type TEXT,
  resource_id TEXT,
  action TEXT NOT NULL,
  result TEXT NOT NULL,
  ip_address TEXT,
  user_agent TEXT,
  details JSONB,
  severity TEXT,
  created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_audit_user ON audit_logs(user_id, timestamp);
CREATE INDEX idx_audit_event ON audit_logs(event_type, timestamp);
CREATE INDEX idx_audit_severity ON audit_logs(severity, timestamp);

Logging Audit Events

Python

from src.db.audit import log_audit_event

await log_audit_event(
    user_id=user.id,
    event_type="permission_changed",
    event_category="authorization",
    resource_type="user",
    resource_id=target_user.id,
    action="grant_admin",
    result="success",
    severity="high",
    details={
        "previous_role": "user",
        "new_role": "admin",
        "changed_by": admin.id
    }
)

Auto-Logging Decorator

@audit_log(
    event_type="api_key_deleted",
    category="security",
    severity="medium"
)
async def delete_api_key(user_id: int, key_id: int):
    # Function automatically logged
    await db.delete_api_key(key_id)

Event Categories

Security

  • auth_success
  • auth_failed
  • permission_denied
  • suspicious_activity
  • key_rotation

Data

  • data_export
  • sensitive_access
  • data_deletion
  • bulk_query

Admin

  • user_created
  • credits_adjusted
  • config_changed
  • feature_toggled

Compliance

  • gdpr_export
  • gdpr_deletion
  • data_retention

Severity Levels

Level Use Case Examples
critical Security incidents Multiple failed logins, privilege escalation
high Important changes Admin role granted, config changes
medium Notable events API key created, payment processed
low Routine events Successful login, balance check

Querying Audit Logs

Recent Security Events

SELECT * FROM audit_logs
WHERE event_category = 'security'
  AND severity IN ('critical', 'high')
  AND timestamp > NOW() - INTERVAL '24 hours'
ORDER BY timestamp DESC;

Failed Authentication

SELECT user_id, COUNT(*) as attempts, MAX(timestamp) as last_attempt
FROM audit_logs
WHERE event_type = 'auth_failed'
  AND timestamp > NOW() - INTERVAL '1 hour'
GROUP BY user_id
HAVING COUNT(*) > 3;

Admin Actions

SELECT admin_id, action, resource_type, timestamp
FROM audit_logs
WHERE admin_id IS NOT NULL
  AND timestamp > NOW() - INTERVAL '7 days'
ORDER BY timestamp DESC;

User Timeline

SELECT event_type, action, result, timestamp
FROM audit_logs
WHERE user_id = 123
ORDER BY timestamp DESC;

API Endpoints

Get Audit Logs (Admin)

GET /admin/audit-logs

Query params:

  • user_id - Filter by user
  • event_type - Event type
  • severity - Severity level
  • from_date, to_date - Date range
  • limit, offset - Pagination

Response:

{
  "total": 1500,
  "data": [
    {
      "id": 10001,
      "timestamp": "2024-12-15T10:30:00Z",
      "event_type": "permission_changed",
      "event_category": "authorization",
      "action": "grant_admin",
      "result": "success",
      "severity": "high",
      "details": {
        "admin_id": 1,
        "target_user": 123
      }
    }
  ]
}

Export Audit Logs

GET /admin/audit-logs/export

Download as CSV or JSON.


Compliance Reports

SOC 2 Report

SELECT
  DATE(timestamp) as date,
  event_category,
  COUNT(*) as events,
  COUNT(*) FILTER (WHERE result = 'failed') as failures
FROM audit_logs
WHERE timestamp BETWEEN '2024-01-01' AND '2024-12-31'
GROUP BY DATE(timestamp), event_category
ORDER BY date;

Access Report

SELECT
  user_id,
  resource_type,
  COUNT(*) as accesses,
  MAX(timestamp) as last_access
FROM audit_logs
WHERE event_category = 'data'
  AND action LIKE '%access%'
GROUP BY user_id, resource_type;

Retention & Archival

Retention Policy

  • Critical events: 7 years
  • High severity: 3 years
  • Medium severity: 1 year
  • Low severity: 90 days

Archive Process

-- Move to cold storage
INSERT INTO audit_logs_archive
SELECT * FROM audit_logs
WHERE timestamp < NOW() - INTERVAL '1 year'
  AND severity IN ('low', 'medium');

-- Delete from hot storage
DELETE FROM audit_logs
WHERE timestamp < NOW() - INTERVAL '1 year'
  AND severity IN ('low', 'medium');

Alerting

Critical Events

if severity == "critical":
    send_alert(
        channel="security",
        message=f"Critical audit event: {event_type}",
        details=details
    )

Suspicious Patterns

-- Alert: Multiple failed logins
CREATE OR REPLACE FUNCTION check_failed_logins()
RETURNS TRIGGER AS $$
BEGIN
  IF (
    SELECT COUNT(*)
    FROM audit_logs
    WHERE user_id = NEW.user_id
      AND event_type = 'auth_failed'
      AND timestamp > NOW() - INTERVAL '15 minutes'
  ) > 5 THEN
    PERFORM send_security_alert('Multiple failed logins', NEW.user_id);
  END IF;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Tamper Protection

Immutable Logs

-- Prevent updates/deletes
CREATE RULE audit_no_update AS
  ON UPDATE TO audit_logs
  DO INSTEAD NOTHING;

CREATE RULE audit_no_delete AS
  ON DELETE TO audit_logs
  DO INSTEAD NOTHING;

Hash Chain

def create_audit_entry(event):
    previous_hash = get_last_audit_hash()

    entry_data = {
        "event": event,
        "timestamp": time.time(),
        "previous_hash": previous_hash
    }

    current_hash = hashlib.sha256(
        json.dumps(entry_data).encode()
    ).hexdigest()

    entry_data["hash"] = current_hash

    return entry_data

Best Practices

  1. Log all security events: Better to over-log
  2. Include context: IP, user agent, metadata
  3. Protect logs: Immutable, tamper-proof
  4. Regular review: Weekly security reviews
  5. Long retention: Keep critical events 7+ years
  6. Alert on anomalies: Automated detection
  7. Separate storage: Don't mix with app logs

GDPR Compliance

Right to Access

async def export_user_audit_logs(user_id: int):
    logs = await db.get_audit_logs(user_id=user_id)
    return generate_gdpr_export(logs)

Right to Erasure

async def anonymize_user_audit_logs(user_id: int):
    # Keep logs but anonymize user data
    await db.update_audit_logs(
        user_id=user_id,
        updates={"user_id": None, "ip_address": None}
    )

Related Documentation


Last Updated: December 2024 Status: Production Ready


Related

Clone this wiki locally