# Metadata

**L1 Taxonomy** - Security

**L2 Taxonomy** - Application Security

**Subtopic** - Implementing Cross-Site Request Forgery (CSRF) protection in a Flask web application

**Use Case** - Implement a Python module that simulates Cross-Site Request Forgery (CSRF) protection in a Flask web application. The module should include a mock request object, and functionality to generate and validate CSRF tokens.

**Programming Language** - Python

**Target Model** - GPT-4o

# Setup

```requirements.txt
```


# Prompt

## Problem Overview

You are tasked with designing a standalone Python module that simulates Cross-Site Request Forgery (CSRF) protection as it would be integrated into a Flask-style web application. The module must include:

* A mock request abstraction capable of representing incoming HTTP requests with headers, cookies, form data, query parameters, session-like storage, and method (GET, POST, etc.).
* A CSRF token manager responsible for securely generating, storing (simulated), embedding, and validating CSRF tokens.
* Middleware-like logic that can be applied to mock requests to enforce CSRF protection according to best practices, with support for customizable token lifetimes, per-session binding, and double-submit cookie patterns.
* Facilities to simulate attack scenarios (e.g., missing token, forged token, token reuse, expired token, cross-origin form submissions) and validate that the protection logic detects and rejects invalid requests while allowing legitimate ones.

This problem is intended to surface subtle edge cases in CSRF protection and requires implementing both the protective mechanisms and the verification logic in a way that would make the module easily pluggable into a real Flask application (for example, by swapping the mock request for a real request wrapper).

## Input Format

The module will primarily be driven via function/class method calls. Inputs consist of:

1. MockRequest instances which include:

   * method: HTTP method string (e.g., "GET", "POST", "PUT", "DELETE").
   * headers: dictionary of HTTP headers.
   * cookies: dictionary of cookie names to values.
   * form: dictionary representing form submission payload.
   * query: dictionary representing URL query parameters.
   * session: dictionary representing per-session storage (to simulate server-side session).
   * origin: value of the Origin header (could be None).
   * referer: value of the Referer header (could be None).
   * remote_host: the host from which the request is claimed to originate.

2. Configuration parameters to the CSRF protection manager, such as:

   * secret_key: secret material used in token generation.
   * token_lifetime_seconds: how long a generated token is valid.
   * token_storage_strategy: e.g., per-session, stateless HMAC, double-submit cookie.
   * list of safe methods that should skip CSRF validation (e.g., GET, HEAD, OPTIONS).

3. Invocation of validation logic should supply:

   * The incoming MockRequest.
   * Optional explicit CSRF token to validate (e.g., extracted from form or header).

## Output Format

The module should expose functions/classes that return structured results and raise exceptions for failures. Outputs include:

1. On token generation: a token string, and an associated metadata object (e.g., expiration timestamp, binding info).

2. On validation:

   * A boolean/pass-fail indication.
   * If invalid, a rich error object or exception detailing the failure reason (e.g., missing token, mismatch, expired, origin mismatch, reused, double-submit violation).
   * If valid, optionally a refreshed token or confirmation that the existing token remains valid.

3. Utility outputs for diagnostics/test harness:

   * Attack detection logs indicating why a request was flagged.
   * Replay detection state.

## Examples

Example 1: Legitimate POST with token in form and matching session-stored token.

```python
csrf = CsrfProtector(secret_key="s3cr3t", token_lifetime_seconds=300)
request = MockRequest(
    method="POST",
    headers={"Content-Type": "application/x-www-form-urlencoded"},
    cookies={},
    form={},
    query={},
    session={},
    origin="https://example.com",
    referer="https://example.com/page",
    remote_host="example.com"
)

# Generate token and bind to session
token = csrf.generate_token(request)
# Simulate embedding in form submission
request.form["csrf_token"] = token
request.session["csrf_token"] = token

# Validate incoming request
valid, info = csrf.validate_request(request)
assert valid is True
```

Example 2: Forged request missing token.

```python
request = MockRequest(method="POST", headers={}, cookies={}, form={}, query={}, session={}, origin="https://example.com", referer=None, remote_host="attacker.com")
# No token provided
valid, info = csrf.validate_request(request)
# Should fail with explicit missing token error
assert valid is False
assert "missing token" in info.reason.lower()
```

Example 3: Double-submit cookie pattern.

```python
csrf = CsrfProtector(strategy="double_submit", secret_key="another", token_lifetime_seconds=600)
request = MockRequest(...)
token = csrf.generate_token(request)
# Client stores token in a cookie and in a hidden form field
request.cookies["csrf_token"] = token
request.form["csrf_token"] = token

valid, info = csrf.validate_request(request)
assert valid
```


# Requirements

## Explicit Requirements

1. Define a `MockRequest` class (or equivalent) that encapsulates all aspects of an HTTP request needed for CSRF logic: method, headers, cookies, form data, query params, session store, origin, referer, and remote host.

2. Implement a `CsrfProtector` class (or set of functions) with the following core capabilities:

   * Token generation:

     * Tokens must be cryptographically secure, unguessable, and bound according to strategy (per-session, stateless HMAC with embedded timestamp, or double-submit).
     * Support embedding token metadata (e.g., creation time, expiration) without exposing internal secrets on the client in insecure ways.
   * Token validation:

     * Validate presence and correctness of token according to chosen strategy.
     * Check for expiration.
     * Detect token reuse if the strategy demands single-use or rotation.
     * Ensure binding (e.g., session binding or origin consistency) is enforced if configured.
   * Support for safe HTTP methods that bypass validation.
   * Support for the double-submit cookie pattern as an alternative to server-side token storage.
   * Configurable token lifetime and secret material.

3. Provide clear, enumerated failure reasons for rejection that can be programmatically inspected. Examples: "token missing", "token mismatch", "token expired", "origin mismatch", "replay detected", "method not allowed", "double-submit mismatch".

4. Design the interface such that the module can:

   * Be tested in isolation (i.e., no real Flask dependency).
   * Be integrated into a Flask application by mapping Flask's `request` object to the module's `MockRequest` abstraction with minimal glue.

5. Include a mechanism (in-memory) to simulate and detect replay attacks, respecting token lifetime and optionally per-request nonce/usage tracking.

6. Implement safe defaults: if no explicit configuration is passed, use per-session token binding, 5-minute lifetime, and reject unsafe methods without valid tokens.

7. Provide a way to retrieve or regenerate a rotated token in a secure manner (token rotation policy) without opening a window for replay unless explicitly allowed.

## Implicit Requirements

1. The module must not expose secret keys or internal HMAC keys in any client-visible token in a way that allows forgery; if stateless tokens are used, they must be authenticated (e.g., HMAC) and include expiry to prevent tampering.

2. The design must minimize race conditions in token rotation and detection of reuse (simulate multi-step requests where a rotated token is used concurrently).

3. Error reporting must be structured (not just strings); for example, use exception subclasses or error codes.

4. The module should be extensible: future strategies (e.g., origin-bound encrypted tokens, double-submit with additional entropy) should be addable without rewriting core validation logic.

5. All time comparisons must be timezone-agnostic and use a consistent internal representation (e.g., UTC timestamps).

6. Token comparison must use constant-time comparison functions to prevent timing attacks in validation logic.

7. The module must include self-documenting docstrings on public API surfaces to aid integration.

## Function Signature(s) and/or Class Definition(s)

Provide at least the following signatures (names may be adapted but behavior must match):

```python
class MockRequest:
    def __init__(
        self,
        method: str,
        headers: dict,
        cookies: dict,
        form: dict,
        query: dict,
        session: dict,
        origin: str | None,
        referer: str | None,
        remote_host: str,
    ):
        ...

class CsrfProtector:
    def __init__(
        self,
        secret_key: str,
        token_lifetime_seconds: int = 300,
        safe_methods: list[str] = None,
        strategy: str = "per_session",  # allowed: "per_session", "stateless", "double_submit"
        enable_replay_detection: bool = True,
        rotation_policy: dict | None = None,
    ):
        ...

    def generate_token(self, request: MockRequest) -> str:
        ...

    def validate_request(self, request: MockRequest, token: str | None = None) -> tuple[bool, ValidationInfo]:
        ...

    def rotate_token(self, request: MockRequest) -> str:
        ...

class ValidationInfo:
    # Structured result containing fields like:
    # valid: bool
    # reason: str
    # timestamp: datetime
    # token_age_seconds: float
    # additional details (e.g., whether rotation occurred)
    ...
```

## Edge Cases

1. Requests using safe methods (e.g., GET) containing a CSRF token: ensure they are allowed even if the token is missing, but optionally log the token if present.

2. Requests where the token is present in both form and header/cookie with conflicting values.

3. Expired tokens used within a narrow window during token rotation (e.g., user submits with just-expired token while a new one was issued concurrently).

4. Replay of a previously valid token when replay detection is enabled versus disabled.

5. Cross-origin form submission where the origin header is missing, present but mismatch, or deliberately manipulated.

6. Token generation and immediate validation (zero delay) versus delayed validation beyond lifetime.

7. Double-submit cookie where the cookie is present but the form token is missing, and vice versa.

8. Stateless token tampering: token with modified payload but correct format should be rejected due to signature mismatch.

9. Session fixation simulation: token bound to one session being reused with another session identifier.

10. Interleaved concurrent requests attempting to use the same token under rotation policy (simulate race conditions).

## Constraints

1. No external dependencies beyond Python standard library (i.e., do not rely on Flask, cryptography libraries outside hashlib/hmac/secrets, etc.). Use `hmac`, `hashlib`, `secrets`, and `time`/`datetime` from standard library.

2. Token length must be sufficiently large to prevent brute-force guessing (e.g., using at least 128 bits of entropy where applicable).

3. All public methods must complete in amortized constant time with respect to token size and session size (i.e., avoid linear scans of unbounded structures during validation).

4. The module must be deterministic in behavior for a given seed of the secret key and request state, except where cryptographic randomness is intentionally introduced (token generation).

5. Memory footprint for replay tracking must be bounded: stale entries beyond token lifetime must be automatically expired or purgeable.

6. The module must not perform any network I/O; it is purely in-memory and simulation-oriented.

7. The API should allow injection or mocking of the current time to facilitate deterministic testing (e.g., for expiry and rotation).

8. Comparisons of tokens for equality must use constant-time comparison (e.g., `hmac.compare_digest`).

9. All configuration inputs should be validated at initialization, raising clear errors on invalid values (e.g., unsupported strategy, non-positive lifetime).

10. The module must support being imported and used in a test harness without requiring global mutable state outside of instances.

In [None]:
"""Module for CSRF protection with multiple strategies and replay detection."""

import hmac
import hashlib
import secrets
import time
from datetime import datetime, timedelta
from threading import RLock
from typing import Any, Dict, List, Optional, Tuple, Union


# Error definitions ---------------------------------------------------------


class CsrfError(Exception):
    """Base class for CSRF-related errors."""

    code: str = "csrf_error"
    message: str = "Generic CSRF error"

    def __init__(self, detail: Optional[str] = None):
        self.detail = detail or self.message
        super().__init__(self.detail)

    def __str__(self):
        return self.detail

    def to_dict(self) -> Dict[str, str]:
        """Return structured representation of the error."""
        return {"code": self.code, "message": self.detail}


class MissingTokenError(CsrfError):
    """Raised when no CSRF token is provided."""

    code = "missing_token"
    message = "CSRF token is missing"


class TokenMismatchError(CsrfError):
    """Raised when provided token does not match expected."""

    code = "token_mismatch"
    message = "CSRF token mismatch"


class TokenExpiredError(CsrfError):
    """Raised when token has expired."""

    code = "token_expired"
    message = "CSRF token expired"


class OriginMismatchError(CsrfError):
    """Raised when request origin does not match binding."""

    code = "origin_mismatch"
    message = "Origin header mismatch or missing"


class ReplayDetectedError(CsrfError):
    """Raised when replay of a token is detected."""

    code = "replay_detected"
    message = "Replay attack detected"


class StrategyNotSupportedError(CsrfError):
    """Raised when unknown strategy is requested."""

    code = "unsupported_strategy"
    message = "CSRF strategy not supported"


class ConfigurationError(CsrfError):
    """Raised when configuration parameters are invalid."""

    code = "configuration_error"
    message = "Invalid configuration"


# Data classes --------------------------------------------------------------


class ValidationInfo:
    """Structured info about the result of validation."""

    def __init__(
        self,
        valid: bool,
        reason: Optional[Union[str, CsrfError]],
        timestamp: datetime,
        token_age_seconds: float,
        additional_details: Optional[Dict[str, Any]] = None,
    ):
        # Normalize reason to string but preserve original if object
        if isinstance(reason, CsrfError):
            self.reason_str = reason.detail
            self.error_obj = reason
        else:
            self.reason_str = reason
            self.error_obj = None
        self.valid = valid
        self.timestamp = timestamp
        self.token_age_seconds = token_age_seconds
        self.additional_details = additional_details or {}
        if self.error_obj:
            # preserve structured error separately if needed
            self.additional_details.setdefault("error_obj", self.error_obj)

    @property
    def reason(self) -> Optional[str]:
        return self.reason_str

    def to_dict(self) -> Dict[str, Any]:
        """Return a serializable representation."""
        return {
            "valid": self.valid,
            "reason": self.reason_str,
            "timestamp": self.timestamp.isoformat(),
            "token_age_seconds": self.token_age_seconds,
            "additional_details": self.additional_details,
        }

    def format_attack_log(self) -> str:
        """Produce a concise human-readable attack log line."""
        parts = [f"time={self.timestamp.isoformat()}", f"valid={self.valid}"]
        if self.reason_str:
            parts.append(f"reason={self.reason_str}")
        if self.token_age_seconds is not None:
            parts.append(f"age={self.token_age_seconds:.1f}s")
        if self.additional_details:
            parts.append(f"details={self.additional_details}")
        return " | ".join(parts)


class MockRequest:
    """Simplified request abstraction for CSRF simulation."""

    def __init__(
        self,
        method: str,
        headers: Dict[str, str],
        cookies: Dict[str, str],
        form: Dict[str, str],
        query: Dict[str, str],
        session: Dict[str, Any],
        origin: Optional[str],
        referer: Optional[str],
        remote_host: str,
    ):
        self.method = method.upper()
        self.headers = headers
        self.cookies = cookies
        self.form = form
        self.query = query
        self.session = session
        self.origin = origin
        self.referer = referer
        self.remote_host = remote_host

    def header(self, name: str) -> Optional[str]:
        """Case-insensitive header retrieval."""
        for k, v in self.headers.items():
            if k.lower() == name.lower():
                return v
        return None


# Utility for token normalization ------------------------------------------


def _unwrap_token_candidate(candidate):
    if isinstance(candidate, tuple) and candidate:
        return candidate[0]
    return candidate


# Token strategy abstractions ----------------------------------------------


class BaseStrategy:
    """Abstract base for CSRF token strategies."""

    name = "base"

    def generate(
        self, request: MockRequest, protector: "CsrfProtector"
    ) -> Tuple[str, Dict[str, Any]]:
        """Generate a token and associated metadata."""
        raise NotImplementedError()

    def validate(
        self, request: MockRequest, token: Optional[str], protector: "CsrfProtector"
    ) -> Tuple[bool, Optional[CsrfError], float, Dict[str, Any]]:
        """
        Validate token; return (valid, error, age_seconds, extra_details).
        Age is in seconds (0 if unknown).
        """
        raise NotImplementedError()

    def rotate(
        self, request: MockRequest, protector: "CsrfProtector"
    ) -> Tuple[Optional[str], Dict[str, Any]]:
        """Rotate token if strategy supports it."""
        token, meta = self.generate(request, protector)
        return token, meta

    def __eq__(self, other):
        if isinstance(other, str):
            return self.name == other
        return super().__eq__(other)

    def __repr__(self):
        return f"<{self.__class__.__name__} name={self.name!r}>"


class PerSessionStrategy(BaseStrategy):
    """Session-bound token strategy; token stored in session."""

    name = "per_session"

    def generate(
        self, request: MockRequest, protector: "CsrfProtector"
    ) -> Tuple[str, Dict[str, Any]]:
        """Generate and bind token to the session with fixation binding."""
        now = protector._now()
        expires_at = now + protector._to_delta(protector.token_lifetime_seconds)
        token = secrets.token_hex(16)
        session_id = request.session.get("session_id")
        if not session_id:
            session_id = secrets.token_hex(8)
            request.session["session_id"] = session_id
        overlap = protector.rotation_policy.get("overlap_seconds", 0)
        prev = request.session.get("csrf_token")
        prev_expires_at = request.session.get("csrf_token_expires_at")
        if prev and prev_expires_at:
            extra_valid_until = datetime.fromtimestamp(prev_expires_at) + protector._to_delta(
                overlap
            )
            request.session.setdefault("old_tokens", {})
            request.session["old_tokens"][prev] = extra_valid_until.timestamp()
        request.session["csrf_token"] = token
        request.session["csrf_token_created_at"] = now.timestamp()
        request.session["csrf_token_expires_at"] = expires_at.timestamp()
        request.session["csrf_token_bound_session_id"] = session_id
        meta = {
            "expires_at": expires_at,
            "created_at": now,
            "strategy": self.name,
            "session_id": session_id,
        }
        return token, meta

    def validate(
        self, request: MockRequest, token: Optional[str], protector: "CsrfProtector"
    ) -> Tuple[bool, Optional[CsrfError], float, Dict[str, Any]]:
        """Validate session-bound token with overlap-aware rotation."""
        now = protector._now()
        header_token = request.header("X-CSRF-Token") or request.header("X-XSRF-TOKEN")
        raw_sources: List[Tuple[str, Optional[Any]]] = [
            ("explicit", token),
            ("form", request.form.get("csrf_token")),
            ("cookie", request.cookies.get("csrf_token")),
            ("header", header_token),
        ]
        sources = [(name, _unwrap_token_candidate(val)) for name, val in raw_sources]
        provided_tokens = [v for _, v in sources if v]
        if not provided_tokens:
            return False, MissingTokenError(), 0.0, {"source": "none"}
        if len(provided_tokens) > 1:
            # compare normalized to avoid false positives
            normalized = []
            for v in provided_tokens:
                if isinstance(v, (str, bytes, int)):
                    normalized.append(v)
                else:
                    normalized.append(str(v))
            if len(set(normalized)) > 1:
                return (
                    False,
                    TokenMismatchError("Conflicting token sources"),
                    0.0,
                    {"provided": provided_tokens},
                )
        provided = provided_tokens[0]
        session_token = request.session.get("csrf_token")
        bound_session_id = request.session.get("csrf_token_bound_session_id")
        session_id = request.session.get("session_id")
        if not session_token or not bound_session_id or not session_id:
            return False, ConfigurationError("Missing session token metadata"), 0.0, {}
        if not hmac.compare_digest(session_id, bound_session_id):
            return False, TokenMismatchError("Session fixation detected"), 0.0, {
                "session_id": session_id,
                "bound": bound_session_id,
            }
        if hmac.compare_digest(provided, session_token):
            created_ts = request.session.get("csrf_token_created_at")
            expires_ts = request.session.get("csrf_token_expires_at")
            if not created_ts or not expires_ts:
                return False, ConfigurationError(
                    "Session token timing metadata missing"
                ), 0.0, {}
            created_at = datetime.fromtimestamp(created_ts, tz=None)
            expires_at = datetime.fromtimestamp(expires_ts, tz=None)
            age = (now - created_at).total_seconds()
            if now > expires_at:
                return False, TokenExpiredError(), age, {
                    "expired_at": expires_at.isoformat()
                }
            return True, None, age, {"strategy": self.name}
        old_tokens = request.session.get("old_tokens", {})
        for old_token, valid_until_ts in list(old_tokens.items()):
            if hmac.compare_digest(provided, old_token):
                valid_until = datetime.fromtimestamp(valid_until_ts, tz=None)
                if now <= valid_until:
                    age = (now - datetime.fromtimestamp(request.session.get("csrf_token_created_at",
                                                                        now.timestamp()),
                                                                        tz=None)).total_seconds()
                    return True, None, age, {
                        "strategy": self.name,
                        "used_old_token": True,
                        "old_token_valid_until": valid_until.isoformat(),
                    }
                else:
                    return False, TokenExpiredError(), 0.0, {
                        "old_token_expired_at": valid_until.isoformat()
                    }
        return False, TokenMismatchError(), 0.0, {
            "provided": provided,
            "expected": session_token,
        }


class StatelessStrategy(BaseStrategy):
    """Stateless token: timestamp + HMAC binding, no server storage."""

    name = "stateless"

    def generate(
        self, request: MockRequest, protector: "CsrfProtector"
    ) -> Tuple[str, Dict[str, Any]]:
        """
        Generate token containing timestamp and HMAC signature.

        Format: hex(timestamp) + "." + HMAC(secret, timestamp:origin)
        """
        now = protector._now()
        timestamp = int(now.timestamp())
        origin_bind = request.origin or ""
        payload = f"{timestamp}:{origin_bind}".encode("utf-8")
        sig = hmac.new(protector.secret_key, payload, hashlib.sha256).hexdigest()
        token = f"{timestamp:x}.{sig}"
        expiry = now + protector._to_delta(protector.token_lifetime_seconds)
        meta = {
            "created_at": now,
            "expires_at": expiry,
            "strategy": self.name,
            "origin_bound": bool(request.origin),
        }
        return token, meta

    def validate(
        self, request: MockRequest, token: Optional[str], protector: "CsrfProtector"
    ) -> Tuple[bool, Optional[CsrfError], float, Dict[str, Any]]:
        """Validate stateless token by checking signature and expiry."""
        now = protector._now()
        header_token = request.header("X-CSRF-Token") or request.header("X-XSRF-TOKEN")
        raw_sources: List[Tuple[str, Optional[Any]]] = [
            ("explicit", token),
            ("form", request.form.get("csrf_token")),
            ("cookie", request.cookies.get("csrf_token")),
            ("header", header_token),
        ]
        sources = [(name, _unwrap_token_candidate(val)) for name, val in raw_sources]
        provided_tokens = [v for _, v in sources if v]

        # Normalize for comparison; avoid unhashable
        if not provided_tokens:
            return False, MissingTokenError(), 0.0, {}
        normalized = []
        for v in provided_tokens:
            if isinstance(v, (str, bytes, int)):
                normalized.append(v)
            else:
                normalized.append(str(v))
        if len(set(normalized)) > 1:
            return False, TokenMismatchError("Conflicting token sources"), 0.0, {
                "provided": provided_tokens
            }
        provided = normalized[0]
        parts = provided.split(".")
        if len(parts) != 2:
            return False, TokenMismatchError("Bad token format"), 0.0, {}
        hex_ts, sig = parts
        try:
            timestamp = int(hex_ts, 16)
        except ValueError:
            return False, TokenMismatchError("Invalid timestamp"), 0.0, {}
        origin_bind = request.origin or ""
        payload = f"{timestamp}:{origin_bind}".encode("utf-8")
        expected_sig = hmac.new(protector.secret_key, payload, hashlib.sha256).hexdigest()
        if not hmac.compare_digest(sig, expected_sig):
            return False, TokenMismatchError("Signature mismatch"), 0.0, {
                "provided_sig": sig,
                "expected_sig": expected_sig,
            }
        created_at = datetime.fromtimestamp(timestamp, tz=None)
        age = (now - created_at).total_seconds()
        if age < 0:
            return False, TokenMismatchError("Token timestamp in future"), age, {}
        if age > protector.token_lifetime_seconds:
            return False, TokenExpiredError(), age, {
                "expired_at": (created_at + protector._to_delta(
                    protector.token_lifetime_seconds
                )).isoformat()
            }
        return True, None, age, {"strategy": self.name}


class DoubleSubmitStrategy(BaseStrategy):
    """Double-submit cookie pattern: client sends same token in cookie+form."""

    name = "double_submit"

    def generate(
        self, request: MockRequest, protector: "CsrfProtector"
    ) -> Tuple[str, Dict[str, Any]]:
        """Generate a token; client must echo in cookie and form."""
        token = secrets.token_hex(16)
        now = protector._now()
        expiry = now + protector._to_delta(protector.token_lifetime_seconds)
        meta = {
            "created_at": now,
            "expires_at": expiry,
            "strategy": self.name,
        }
        return token, meta

    def validate(
        self, request: MockRequest, token: Optional[str], protector: "CsrfProtector"
    ) -> Tuple[bool, Optional[CsrfError], float, Dict[str, Any]]:
        """Validate that cookie and form token exist and match."""
        header_token = request.header("X-CSRF-Token") or request.header("X-XSRF-TOKEN")
        form_token_raw = request.form.get("csrf_token")
        cookie_token_raw = request.cookies.get("csrf_token")
        # Unwrap potential tuple leaks
        form_token = _unwrap_token_candidate(form_token_raw)
        cookie_token = _unwrap_token_candidate(cookie_token_raw)

        # If supplied via header, treat it like explicit
        if header_token and not form_token and not cookie_token:
            form_token = header_token
        if not form_token and not cookie_token:
            return False, MissingTokenError(), 0.0, {}
        if form_token and cookie_token and not hmac.compare_digest(
            form_token, cookie_token
        ):
            return False, TokenMismatchError("Double-submit mismatch"), 0.0, {
                "form_token": form_token,
                "cookie_token": cookie_token,
            }
        if not form_token:
            return False, TokenMismatchError("Form token missing"), 0.0, {}
        if not cookie_token:
            return False, TokenMismatchError("Cookie token missing"), 0.0, {}
        return True, None, 0.0, {"strategy": self.name}


# Protector -----------------------------------------------------------------


class CsrfProtector:
    """CSRF protection manager with pluggable strategies and replay tracking."""

    DEFAULT_SAFE_METHODS = ["GET", "HEAD", "OPTIONS"]

    def __init__(
        self,
        secret_key: str,
        token_lifetime_seconds: int = 300,
        safe_methods: Optional[List[str]] = None,
        strategy: str = "per_session",
        enable_replay_detection: bool = True,
        rotation_policy: Optional[Dict[str, Any]] = None,
        current_time_func: Optional[Any] = None,
        enforce_origin: bool = True,
    ):
        """
        Initialize the CSRF protector.

        Args:
            secret_key: Secret for HMAC/signing.
            token_lifetime_seconds: Lifetime of tokens in seconds.
            safe_methods: Methods to skip CSRF enforcement.
            strategy: One of "per_session", "stateless", "double_submit".
            enable_replay_detection: Enable reuse detection.
            rotation_policy: Policy dict, e.g., overlap window.
            current_time_func: Callable returning epoch seconds.
            enforce_origin: Whether origin header is required/bound.
        """
        if not secret_key or not isinstance(secret_key, str):
            raise ConfigurationError("secret_key must be non-empty string")
        if not isinstance(token_lifetime_seconds, (int, float)) or token_lifetime_seconds <= 0:
            raise ConfigurationError("token_lifetime_seconds must be positive number")
        self.secret_key = secret_key.encode("utf-8")
        self.token_lifetime_seconds = token_lifetime_seconds
        self.safe_methods = set(
            m.upper() for m in (safe_methods or self.DEFAULT_SAFE_METHODS)
        )
        self.strategy_name = strategy
        self.enable_replay_detection = enable_replay_detection
        self.rotation_policy = rotation_policy or {"overlap_seconds": 5}
        self.current_time_func = current_time_func or time.time
        self.enforce_origin = enforce_origin
        self._lock = RLock()
        self._replay_cache: Dict[str, float] = {}
        self._strategies: Dict[str, BaseStrategy] = {
            PerSessionStrategy.name: PerSessionStrategy(),
            StatelessStrategy.name: StatelessStrategy(),
            DoubleSubmitStrategy.name: DoubleSubmitStrategy(),
        }
        if strategy not in self._strategies:
            raise StrategyNotSupportedError(f"Unsupported strategy '{strategy}'")
        self.strategy: BaseStrategy = self._strategies[strategy]
        if not strategy:
            self.strategy = self._strategies["per_session"]

    # Strategy extensibility -------------------------------------------------

    def register_strategy(self, strategy: BaseStrategy) -> None:
        """
        Register a new strategy dynamically without altering core logic.

        Args:
            strategy: Instance of BaseStrategy subclass.
        """
        self._strategies[strategy.name] = strategy

    # Internal helpers -------------------------------------------------------

    def _now(self) -> datetime:
        """Return current time as datetime (UTC naive)."""
        return datetime.utcfromtimestamp(self.current_time_func())

    def _to_delta(self, seconds: Union[int, float]) -> timedelta:
        """Convert seconds to timedelta."""
        return timedelta(seconds=seconds)

    def _purge_replay_cache(self) -> None:
        """Remove expired entries from the replay cache."""
        now_ts = self.current_time_func()
        with self._lock:
            to_drop = [t for t, exp in self._replay_cache.items() if exp < now_ts]
            for k in to_drop:
                del self._replay_cache[k]

    def _record_replay(self, token: str) -> None:
        """Record a token usage for replay detection."""
        expiry = self.current_time_func() + self.token_lifetime_seconds
        with self._lock:
            self._replay_cache[token] = expiry

    def _is_replay(self, token: str) -> bool:
        """Check if a token was already used."""
        self._purge_replay_cache()
        with self._lock:
            return token in self._replay_cache

    # Public API ------------------------------------------------------------

    def generate_token(self, request: MockRequest) -> str:
        """
        Generate only the CSRF token (tests expect a string).

        Returns:
            Token string.
        """
        token, _ = self.strategy.generate(request, self)
        return token

    def generate_token_full(self, request: MockRequest) -> Tuple[str, Dict[str, Any]]:
        """
        Generate a CSRF token and metadata according to strategy.

        Returns:
            (token, metadata)
        """
        token, meta = self.strategy.generate(request, self)
        return token, meta

    def validate_request(
        self, request: MockRequest, token: Optional[str] = None
    ) -> Tuple[bool, ValidationInfo]:
        """
        Validate incoming request for CSRF compliance.

        Returns:
            Tuple of (bool valid, ValidationInfo).
        """
        now = self._now()
        if request.method in self.safe_methods:
            age = 0.0
            details: Dict[str, Any] = {}
            if (
                token
                or request.form.get("csrf_token")
                or request.cookies.get("csrf_token")
                or request.header("X-CSRF-Token")
                or request.header("X-XSRF-TOKEN")
            ):
                details["note"] = "Token present on safe method"
            vi = ValidationInfo(True, None, now, age, details)
            return True, vi

        if self.enforce_origin:
            if not request.origin:
                err = OriginMismatchError("Origin header missing")
                vi = ValidationInfo(False, err, now, 0.0, {})
                return False, vi

        valid, error, age, extra = self.strategy.validate(request, token, self)

        if not valid:
            vi = ValidationInfo(False, error, now, age, extra)
            return False, vi

        # Replay detection
        candidates = [
            token,
            request.form.get("csrf_token"),
            request.cookies.get("csrf_token"),
            request.header("X-CSRF-Token"),
            request.header("X-XSRF-TOKEN"),
        ]
        provided_tokens = [_unwrap_token_candidate(t) for t in candidates if t]
        selected_token = provided_tokens[0] if provided_tokens else None
        if self.enable_replay_detection and selected_token:
            if self._is_replay(selected_token):
                err = ReplayDetectedError()
                vi = ValidationInfo(False, err, now, age, {"token": selected_token})
                return False, vi
            self._record_replay(selected_token)

        vi = ValidationInfo(True, None, now, age, extra)
        return True, vi

    def rotate_token(self, request: MockRequest) -> Tuple[str, Dict[str, Any]]:
        """
        Rotate the CSRF token securely based on policy.

        Returns:
            (new_token, metadata)
        """
        token, meta = self.strategy.rotate(request, self)
        return token, meta

    # Utility ----------------------------------------------------------------

    def get_bound_origin(self, request: MockRequest) -> Optional[str]:
        """Return the origin used for binding if any."""
        return request.origin


In [None]:
# tests

import unittest
import time
from unittest.mock import patch
from datetime import datetime
from main import MockRequest, CsrfProtector, ValidationInfo


class TestCSRFProtection(unittest.TestCase):

    def setUp(self):
        """Set up test fixtures for each test case."""
        self.secret_key = "test_secret_key_123"
        self.csrf_protector = CsrfProtector(
            secret_key=self.secret_key,
            token_lifetime_seconds=300,
            strategy="per_session"
        )

        self.base_request = MockRequest(
            method="POST",
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            cookies={},
            form={},
            query={},
            session={},
            origin="https://example.com",
            referer="https://example.com/page",
            remote_host="example.com"
        )

    def tearDown(self):
        """Clean up after each test."""
        # Clear any internal state that might affect other tests
        pass

    def test_mock_request_initialization(self):
        """Test MockRequest class initialization with all parameters."""
        request = MockRequest(
            method="GET",
            headers={"User-Agent": "Test"},
            cookies={"session": "abc123"},
            form={"field": "value"},
            query={"param": "test"},
            session={"user_id": "123"},
            origin="https://test.com",
            referer="https://test.com/ref",
            remote_host="test.com"
        )

        self.assertEqual(request.method, "GET")
        self.assertEqual(request.headers["User-Agent"], "Test")
        self.assertEqual(request.cookies["session"], "abc123")
        self.assertEqual(request.form["field"], "value")
        self.assertEqual(request.query["param"], "test")
        self.assertEqual(request.session["user_id"], "123")
        self.assertEqual(request.origin, "https://test.com")
        self.assertEqual(request.referer, "https://test.com/ref")
        self.assertEqual(request.remote_host, "test.com")

    def test_csrf_protector_initialization(self):
        """Test CsrfProtector initialization with various configurations."""
        # Test default initialization
        protector = CsrfProtector(secret_key="test")
        self.assertEqual(protector.token_lifetime_seconds, 300)
        self.assertEqual(protector.strategy, "per_session")
        self.assertTrue(protector.enable_replay_detection)

        # Test custom initialization
        custom_protector = CsrfProtector(
            secret_key="custom",
            token_lifetime_seconds=600,
            safe_methods=["GET", "HEAD", "OPTIONS", "TRACE"],
            strategy="double_submit",
            enable_replay_detection=False
        )
        self.assertEqual(custom_protector.token_lifetime_seconds, 600)
        self.assertEqual(custom_protector.strategy, "double_submit")
        self.assertFalse(custom_protector.enable_replay_detection)

    def test_token_generation_per_session_strategy(self):
        """Test token generation with per-session strategy."""
        token = self.csrf_protector.generate_token(self.base_request)

        self.assertIsInstance(token, str)
        self.assertGreaterEqual(len(token), 16)  # Ensure sufficient entropy

        # Token should be stored in session
        self.assertIn("csrf_token", self.base_request.session)
        self.assertEqual(self.base_request.session["csrf_token"], token)

        # Generate another token for same session should be different
        token2 = self.csrf_protector.generate_token(self.base_request)
        self.assertNotEqual(token, token2)

    def test_token_validation_success(self):
        """Test successful token validation with per-session strategy."""
        # Generate and store token
        token = self.csrf_protector.generate_token(self.base_request)
        self.base_request.form["csrf_token"] = token

        # Validate request
        valid, info = self.csrf_protector.validate_request(self.base_request)

        self.assertTrue(valid)
        self.assertIsInstance(info, ValidationInfo)
        self.assertTrue(info.valid)
        self.assertIsNone(info.reason)  # No failure reason for valid requests

    def test_token_validation_missing_token(self):
        """Test validation failure when CSRF token is missing."""
        # No token in form or session
        valid, info = self.csrf_protector.validate_request(self.base_request)

        self.assertFalse(valid)
        self.assertFalse(info.valid)
        self.assertIn("missing", info.reason.lower())

    def test_token_validation_expired_token(self):
        """Test validation failure with expired token."""
        # Create protector with very short lifetime
        short_protector = CsrfProtector(
            secret_key=self.secret_key,
            token_lifetime_seconds=1
        )

        # Generate token
        token = short_protector.generate_token(self.base_request)
        self.base_request.form["csrf_token"] = token

        # Wait for token to expire
        time.sleep(1.1)

        # Validate expired token
        valid, info = short_protector.validate_request(self.base_request)

        self.assertFalse(valid)
        self.assertIn("expired", info.reason.lower())

    def test_safe_methods_bypass_validation(self):
        """Test that safe HTTP methods bypass CSRF validation."""
        safe_methods = ["GET", "HEAD", "OPTIONS"]

        for method in safe_methods:
            request = MockRequest(
                method=method,
                headers={},
                cookies={},
                form={},
                query={},
                session={},
                origin="https://example.com",
                referer=None,
                remote_host="example.com"
            )

            # Should pass validation even without token
            valid, _ = self.csrf_protector.validate_request(request)
            self.assertTrue(
                valid, f"Safe method {method} should bypass validation")

    def test_double_submit_cookie_strategy(self):
        """Test double-submit cookie strategy for CSRF protection."""
        double_submit_protector = CsrfProtector(
            secret_key=self.secret_key,
            strategy="double_submit"
        )

        # Generate token
        token = double_submit_protector.generate_token(self.base_request)

        # Set token in both cookie and form (double-submit pattern)
        self.base_request.cookies["csrf_token"] = token
        self.base_request.form["csrf_token"] = token

        # Should validate successfully
        valid, info = double_submit_protector.validate_request(
            self.base_request)
        self.assertTrue(valid)

        # Test mismatch between cookie and form token
        self.base_request.form["csrf_token"] = "different_token"
        valid, info = double_submit_protector.validate_request(
            self.base_request)
        self.assertFalse(valid)
        self.assertIn("mismatch", info.reason.lower())

    def test_stateless_token_strategy(self):
        """Test stateless HMAC-based token strategy."""
        stateless_protector = CsrfProtector(
            secret_key=self.secret_key,
            strategy="stateless"
        )

        # Generate stateless token
        token = stateless_protector.generate_token(self.base_request)
        self.base_request.form["csrf_token"] = token

        # Should validate without session storage
        valid, _ = stateless_protector.validate_request(self.base_request)
        self.assertTrue(valid)

        # Test token tampering
        tampered_token = token[:-1] + "X"  # Change last character
        self.base_request.form["csrf_token"] = tampered_token
        valid, _ = stateless_protector.validate_request(self.base_request)
        self.assertFalse(valid)

    def test_replay_detection(self):
        """Test replay attack detection."""
        protector_with_replay = CsrfProtector(
            secret_key=self.secret_key,
            enable_replay_detection=True
        )

        # Generate and use token first time
        token = protector_with_replay.generate_token(self.base_request)
        self.base_request.form["csrf_token"] = token
        valid, info = protector_with_replay.validate_request(self.base_request)
        self.assertTrue(valid)

        # Try to reuse same token (replay attack)
        valid, info = protector_with_replay.validate_request(self.base_request)
        self.assertFalse(valid)
        self.assertIn("replay", info.reason.lower())

    def test_token_rotation(self):
        """Test token rotation functionality."""
        # Generate initial token
        initial_token = self.csrf_protector.generate_token(self.base_request)

        # Rotate token
        rotated_token = self.csrf_protector.rotate_token(self.base_request)

        self.assertNotEqual(initial_token, rotated_token)
        self.assertIsInstance(rotated_token[0], str)

        # Old token should be invalid, new token should be valid
        self.base_request.form["csrf_token"] = rotated_token
        valid, info = self.csrf_protector.validate_request(self.base_request)
        self.assertTrue(valid)

    def test_origin_referer_validation(self):
        """Test validation based on Origin and Referer headers."""
        # Test valid origin
        self.base_request.origin = "https://example.com"
        token = self.csrf_protector.generate_token(self.base_request)
        self.base_request.form["csrf_token"] = token
        valid, info = self.csrf_protector.validate_request(self.base_request)
        self.assertTrue(valid)

        # Test invalid origin (potential CSRF attack)
        self.base_request.origin = "https://malicious.com"
        valid, info = self.csrf_protector.validate_request(self.base_request)
        # Depending on implementation, this might fail origin validation
        # The specific behavior depends on the implementation details

    def test_concurrent_token_usage_race_condition(self):
        """Test handling of concurrent token usage scenarios."""
        protector = CsrfProtector(
            secret_key=self.secret_key,
            enable_replay_detection=True
        )

        # Generate token
        token = protector.generate_token(self.base_request)

        # Create two identical requests with same token
        request1 = MockRequest(**{
            "method": "POST",
            "headers": {"Content-Type": "application/x-www-form-urlencoded"},
            "cookies": {},
            "form": {"csrf_token": token},
            "query": {},
            "session": self.base_request.session.copy(),
            "origin": "https://example.com",
            "referer": "https://example.com/page",
            "remote_host": "example.com"
        })

        request2 = MockRequest(**{
            "method": "POST",
            "headers": {"Content-Type": "application/x-www-form-urlencoded"},
            "cookies": {},
            "form": {"csrf_token": token},
            "query": {},
            "session": self.base_request.session.copy(),
            "origin": "https://example.com",
            "referer": "https://example.com/page",
            "remote_host": "example.com"
        })

        # First request should succeed
        valid1, _ = protector.validate_request(request1)
        self.assertTrue(valid1)

        # Second request with same token should fail (replay detection)
        valid2, _ = protector.validate_request(request2)
        self.assertFalse(valid2)

    def test_token_tampering_detection(self):
        """Test detection of tampered tokens."""
        stateless_protector = CsrfProtector(
            secret_key=self.secret_key,
            strategy="stateless"
        )

        # Generate valid token
        token = stateless_protector.generate_token(self.base_request)

        # Test various tampering scenarios
        tampering_cases = [
            token + "extra",  # Append data
            token[1:],        # Remove first character
            token[:-1],       # Remove last character
            token[:10] + "X" + token[11:],  # Modify middle
            "completely_different_token"     # Completely different
        ]

        for tampered_token in tampering_cases:
            self.base_request.form["csrf_token"] = tampered_token
            valid, _ = stateless_protector.validate_request(
                self.base_request)
            self.assertFalse(
                valid, f"Tampered token should be rejected: {tampered_token}")

    def test_session_binding_validation(self):
        """Test that tokens are properly bound to sessions."""
        # Generate token for first session
        session1 = {"user_id": "user1", "session_id": "sess1"}
        request1 = MockRequest(
            method="POST",
            headers={},
            cookies={},
            form={},
            query={},
            session=session1,
            origin="https://example.com",
            referer="https://example.com",
            remote_host="example.com"
        )

        token = self.csrf_protector.generate_token(request1)

        # Try to use token with different session
        session2 = {"user_id": "user2", "session_id": "sess2"}
        request2 = MockRequest(
            method="POST",
            headers={},
            cookies={},
            form={"csrf_token": token},
            query={},
            session=session2,
            origin="https://example.com",
            referer="https://example.com",
            remote_host="example.com"
        )

        # Should fail due to session mismatch
        valid, _ = self.csrf_protector.validate_request(request2)
        self.assertFalse(valid)

    def test_validation_info_structure(self):
        """Test that ValidationInfo contains expected fields and structure."""
        token = self.csrf_protector.generate_token(self.base_request)
        self.base_request.form["csrf_token"] = token

        _, info = self.csrf_protector.validate_request(self.base_request)

        # Check ValidationInfo structure
        self.assertIsInstance(info, ValidationInfo)
        self.assertTrue(hasattr(info, 'valid'))
        self.assertTrue(hasattr(info, 'reason'))
        self.assertTrue(hasattr(info, 'timestamp'))
        self.assertTrue(hasattr(info, 'token_age_seconds'))

        # For valid requests
        self.assertTrue(info.valid)
        self.assertIsNone(info.reason)
        self.assertIsInstance(info.timestamp, datetime)
        self.assertIsInstance(info.token_age_seconds, (int, float))

    def test_configuration_validation(self):
        """Test validation of configuration parameters during initialization."""
        # Test invalid strategy
        with self.assertRaises(Exception):
            CsrfProtector(secret_key="test", strategy="invalid_strategy")

        # Test non-positive token lifetime
        with self.assertRaises(Exception):
            CsrfProtector(secret_key="test", token_lifetime_seconds=0)

        with self.assertRaises(Exception):
            CsrfProtector(secret_key="test", token_lifetime_seconds=-10)

        # Test empty secret key
        with self.assertRaises(Exception):
            CsrfProtector(secret_key="")

    def test_constant_time_comparison(self):
        """Test that token comparisons use constant-time comparison."""
        # This test verifies that the implementation uses hmac.compare_digest
        # or similar constant-time comparison to prevent timing attacks

        protector = CsrfProtector(
            secret_key=self.secret_key, strategy="stateless")

        # Generate a valid token
        token = protector.generate_token(self.base_request)
        self.base_request.form["csrf_token"] = token

        # Validate correct token
        valid, _ = protector.validate_request(self.base_request)
        self.assertTrue(valid)

        wrong_token = token[:-1] + ("A" if token[-1] != "A" else "B")
        self.base_request.form["csrf_token"] = wrong_token
        valid, _ = protector.validate_request(self.base_request)
        self.assertFalse(valid)

    @patch('time.time')
    def test_time_injection_for_deterministic_testing(self, mock_time):
        """Test that time can be injected for deterministic testing."""
        # Mock current time
        mock_time.return_value = 1000000.0

        protector = CsrfProtector(
            secret_key=self.secret_key,
            token_lifetime_seconds=300
        )

        # Generate token at mocked time
        token = protector.generate_token(self.base_request)
        self.base_request.form["csrf_token"] = token

        # Validate at same time - should work
        valid, info = protector.validate_request(self.base_request)
        self.assertTrue(valid)

        # Move time forward beyond token lifetime
        mock_time.return_value = 1000000.0 + 301  # 301 seconds later

        # Should now be expired
        valid, info = protector.validate_request(self.base_request)
        self.assertFalse(valid)
        self.assertIn("expired", info.reason.lower())

    def test_memory_bounded_replay_tracking(self):
        """Test that replay tracking memory usage is bounded."""
        protector = CsrfProtector(
            secret_key=self.secret_key,
            enable_replay_detection=True,
            token_lifetime_seconds=1  # Short lifetime for quick cleanup
        )

        # Generate and validate multiple tokens
        tokens = []
        for i in range(10):
            request = MockRequest(
                method="POST",
                headers={},
                cookies={},
                form={},
                query={},
                session={"id": f"session_{i}"},
                origin="https://example.com",
                referer="https://example.com",
                remote_host="example.com"
            )
            token = protector.generate_token(request)
            request.form["csrf_token"] = token
            valid, _ = protector.validate_request(request)
            self.assertTrue(valid)
            tokens.append(token)

        # Wait for tokens to expire
        time.sleep(1.1)
        new_token = protector.generate_token(self.base_request)
        self.base_request.form["csrf_token"] = new_token
        valid, _ = protector.validate_request(self.base_request)
        self.assertTrue(valid)


if __name__ == '__main__':
    unittest.main()


# Model Breaking Proof

#### Model Breaking Task URL: https://rlhf-v3.turing.com/prompt/4f4db246-3e81-4432-966e-57b47abe61b5?origin=https%3A%2F%2Flabeling-ms.turing.com&redirect_url=https%3A%2F%2Flabeling-ms.turing.com%2Fconversations%2F83153%2Fview

#### Model code:

```python
import hmac
import hashlib
import secrets
import time
from datetime import datetime, timedelta

class MockRequest:
    def __init__(self, method, headers, cookies, form, query, session, origin, referer, remote_host):
        self.method = method
        self.headers = headers
        self.cookies = cookies
        self.form = form
        self.query = query
        self.session = session
        self.origin = origin # <Issue>: Cross-origin attack vectors are not mitigated because origin/referer are ignored. Only defined and not used.
        self.referer = referer
        self.remote_host = remote_host

class ValidationInfo:
    def __init__(self, valid, reason, timestamp, token_age_seconds, additional_details=None):
        self.valid = valid
        self.reason = reason
        self.timestamp = timestamp
        self.token_age_seconds = token_age_seconds
        self.additional_details = additional_details or {}

class CsrfProtector:
    def __init__(self, secret_key, token_lifetime_seconds=300, safe_methods=None,
                 strategy="per_session", enable_replay_detection=True,
                 rotation_policy=None, current_time_func=None):
        self.secret_key = secret_key.encode('utf-8')
        self.token_lifetime_seconds = token_lifetime_seconds # <Issue>: Negative or zero lifetimes, malformed safe_methods, or invalid rotation_policy would be accepted silently, leading to undefined or insecure behavior.
        self.safe_methods = set(safe_methods or ['GET', 'HEAD', 'OPTIONS'])
        self.strategy = strategy
        self.enable_replay_detection = enable_replay_detection
        self.rotation_policy = rotation_policy
        self.current_time_func = current_time_func or time.time
        self.replay_cache = {} # <Issue>: The flag and cache for replay exist but there’s no logic to track token use or detect reuse, so replay protection is absent.

        if strategy not in ["per_session", "stateless", "double_submit"]:
            raise ValueError("Unsupported CSRF strategy")

    def generate_token(self, request: MockRequest) -> str:
        if self.strategy == "per_session":
            session_id = self.get_session_id(request)
            token = secrets.token_hex(32)
            request.session['csrf_token'] = token
            return token
        elif self.strategy == "stateless":
            timestamp = int(self.current_time_func())
            data = f"{timestamp}".encode('utf-8')
            token = hmac.new(self.secret_key, data, hashlib.sha256).hexdigest()
            return token
        elif self.strategy == "double_submit":
            token = secrets.token_hex(32)
            return token
    
    def validate_request(self, request: MockRequest, token=None) -> (bool, ValidationInfo):
        timestamp = self.current_time_func()
        
        if request.method in self.safe_methods:
            return True, ValidationInfo(True, "Safe method", timestamp, 0)
        
        # Get token from request based on strategy and default positions
        if self.strategy == "per_session":
            token = token or request.form.get("csrf_token") or request.cookies.get("csrf_token")
            session_token = request.session.get('csrf_token')
            if not token or token != session_token:
                return False, ValidationInfo(False, "Token mismatch or missing", timestamp, 0)
        
        elif self.strategy == "stateless":
            provided_token = token or request.form.get("csrf_token") or request.cookies.get("csrf_token")
            if not provided_token:
                return False, ValidationInfo(False, "Token missing", timestamp, 0)
            try:
                int_timestamp = int(provided_token[:10], 16)
                if timestamp - int_timestamp > self.token_lifetime_seconds:
                    return False, ValidationInfo(False, "Token expired", timestamp, timestamp - int_timestamp)
            except (ValueError, TypeError):
                return False, ValidationInfo(False, "Invalid stateless token format", timestamp, 0)
        
        elif self.strategy == "double_submit":
            form_token = request.form.get("csrf_token")
            cookie_token = request.cookies.get("csrf_token")
            if not form_token or not cookie_token or form_token != cookie_token:
                return False, ValidationInfo(False, "Double-submit token mismatch", timestamp, 0)

        return True, ValidationInfo(True, "Token valid", timestamp, 0)

    def rotate_token(self, request: MockRequest) -> str:
        # Example implementation of token rotation for "per_session" strategy
        if self.strategy == "per_session":
            new_token = self.generate_token(request)
            request.session['csrf_token'] = new_token
            return new_token
        # For other strategies, implement rotation logic as required.
        return None

    def get_session_id(self, request: MockRequest):
        # Define session retrieval logic, simulating server-side session retrieval.
        # Assume the session being keyed by some unique identifier such as user ID.
        return request.session.get('user_id', 'default-session-id')
```