"""
governance_overlay.py

Reference implementation for:
- Decentralized governance overlay (no raw medical data on-ledger)
- Trust (hash commitments + provenance)
- Audit (append-only ledger events)
- Consent (machine-readable policy + enforcement checks)
- Reproducibility (manifests to reconstruct decisions)
- Optional decentralized compute concept for training/experiments on de-ID/synthetic

This uses a local "append-only hash-chained ledger" to mimic blockchain properties.
Swap LocalLedger with a real ledger client (Ratio1, Hyperledger, Ethereum, etc.) later.
"""

In [1]:
import os, shutil, time


STAMP = time.strftime("%Y%m%d_%H%M%S")
ARCHIVE_DIR = f"results_{STAMP}"

os.makedirs(ARCHIVE_DIR, exist_ok=True)


In [2]:
from __future__ import annotations
import os
import json
import time
import uuid
import hashlib
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Optional, Tuple


# -----------------------------
# 1) Utilities: stable hashing
# -----------------------------

def stable_json_dumps(obj: Any) -> str:
    """
    Deterministic JSON serialization.
    
    Ensures that the same object always produces the same JSON string,
    regardless of Python's default dictionary ordering or execution environment.
    This is critical for cryptographic hashing - two identical objects must
    produce identical hashes.
    
    Args:
        obj: Any JSON-serializable Python object
        
    Returns:
        A deterministic JSON string with sorted keys and consistent separators
    """
    return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)


def sha256_bytes(data: bytes) -> str:
    """
    Compute SHA-256 hash of raw bytes.
    
    Args:
        data: Raw bytes to hash
        
    Returns:
        64-character hexadecimal hash string
    """
    return hashlib.sha256(data).hexdigest()


def sha256_text(text: str) -> str:
    """
    Compute SHA-256 hash of a text string.
    
    Args:
        text: UTF-8 string to hash
        
    Returns:
        64-character hexadecimal hash string
    """
    return sha256_bytes(text.encode("utf-8"))


def sha256_file(path: str, chunk_size: int = 1024 * 1024) -> str:
    """
    Hash a file without loading it entirely into memory.
    
    This is essential for large medical images (DICOM, whole-slide imaging, etc.)
    that may exceed available RAM. Reads file in chunks and incrementally
    updates the hash.
    
    Args:
        path: Filesystem path to the file
        chunk_size: Number of bytes to read at once (default 1MB)
        
    Returns:
        64-character hexadecimal SHA-256 hash of the entire file
    """
    h = hashlib.sha256()
    with open(path, "rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            h.update(chunk)
    return h.hexdigest()


def now_epoch_ms() -> int:
    """
    Get current Unix timestamp in milliseconds.
    
    Millisecond precision is useful for ordering events in high-frequency
    audit logs where second-level timestamps might collide.
    
    Returns:
        Current time as integer milliseconds since Unix epoch
    """
    return int(time.time() * 1000)



In [3]:

# ---------------------------------------
# 2) Consent policy model + enforcement
# ---------------------------------------

@dataclass(frozen=True)
class ConsentPolicy:
    """
    A machine-readable consent policy representing patient authorization.
    
    In real-world clinical systems, this maps to:
    - Legal consent forms signed by patients
    - Institutional review board (IRB) approvals
    - Data use agreements (DUAs)
    - GDPR/HIPAA authorization documents
    
    The policy explicitly states:
    - Who (roles) can access data
    - For what purposes (clinical care vs research)
    - Time validity window
    - Whether consent can be revoked
    
    Attributes:
        policy_id: Unique identifier for this consent policy
        subject_id: Patient identifier (may be pseudonymous for privacy)
        allowed_purposes: List of permitted data uses (e.g., ["clinical_care", "research"])
        allowed_roles: List of authorized roles (e.g., ["dermatologist", "ml_engineer"])
        valid_from_ms: Timestamp when consent becomes active (milliseconds)
        valid_until_ms: Optional expiration timestamp; None means no expiry
        revocable: Whether patient can withdraw consent later
        revoked: Current revocation status
        version: Policy version number (increments on updates)
    """
    policy_id: str
    subject_id: str
    allowed_purposes: List[str]
    allowed_roles: List[str]
    valid_from_ms: int
    valid_until_ms: Optional[int] = None
    revocable: bool = True
    revoked: bool = False
    version: int = 1

    def is_valid_at(self, t_ms: int) -> bool:
        """
        Check if policy is temporally valid at a specific timestamp.
        
        A policy is valid if:
        1. Current time is after valid_from_ms
        2. Current time is before valid_until_ms (if set)
        3. Policy has not been revoked
        
        Args:
            t_ms: Timestamp to check (milliseconds since epoch)
            
        Returns:
            True if policy is valid at the given time, False otherwise
        """
        if t_ms < self.valid_from_ms:
            return False
        if self.valid_until_ms is not None and t_ms > self.valid_until_ms:
            return False
        if self.revoked:
            return False
        return True

    def allows(self, purpose: str, role: str, t_ms: int) -> bool:
        """
        Check if policy permits a specific access request.
        
        This is the core authorization check. Access is granted only if:
        1. Policy is temporally valid
        2. Requested purpose is in allowed_purposes
        3. Requesting role is in allowed_roles
        
        Args:
            purpose: Intended use of data (e.g., "clinical_care")
            role: Role of requesting actor (e.g., "dermatologist")
            t_ms: Timestamp of access request
            
        Returns:
            True if access is authorized, False otherwise
        """
        return self.is_valid_at(t_ms) and (purpose in self.allowed_purposes) and (role in self.allowed_roles)




In [4]:
# ---------------------------------------
# 3) Manifests: preprocessing, training, inference, reproducibility
# ---------------------------------------

@dataclass(frozen=True)
class PipelineManifest:
    """
    Describes the computational pipeline used to process data.
    
    This manifest captures the "recipe" for data processing, enabling:
    - Reproducibility: Re-run exact same pipeline later
    - Auditability: Know exactly what was done to the data
    - Validation: Verify that approved pipelines were used
    
    In clinical AI, this might describe:
    - Image preprocessing (normalization, resizing, augmentation)
    - Inference pipelines (model + post-processing)
    - Training workflows
    
    Attributes:
        pipeline_name: Human-readable pipeline identifier
        pipeline_version: Semantic version (e.g., "1.2.0")
        container_image: Docker/OCI image reference for reproducibility
        code_commit: Git commit hash linking to exact source code
        parameters: Algorithm parameters (thresholds, sizes, etc.)
        dependencies: Library versions (torch, numpy, etc.)
    """
    pipeline_name: str
    pipeline_version: str
    container_image: Optional[str]
    code_commit: Optional[str]
    parameters: Dict[str, Any]
    dependencies: Dict[str, str]

    def fingerprint(self) -> str:
        """
        Generate cryptographic fingerprint of this pipeline configuration.
        
        Two pipelines with identical settings will produce identical fingerprints.
        Any change to parameters, versions, or dependencies changes the fingerprint.
        
        Returns:
            64-character hexadecimal SHA-256 hash
        """
        return sha256_text(stable_json_dumps(asdict(self)))


@dataclass(frozen=True)
class ModelSpec:
    """
    Identifies a specific AI/ML model version.
    
    Critical for clinical AI governance:
    - Know which model version made each diagnosis
    - Track model updates and retraining
    - Link to validation/approval documentation
    - Enable model rollback if issues found
    
    Attributes:
        model_name: Model identifier (e.g., "lesion_classifier")
        model_version: Version string (e.g., "0.9.3")
        weights_hash: SHA-256 of model weights file (proves exact weights used)
        framework: ML framework (pytorch, tensorflow, etc.)
        extra: Additional metadata (calibration method, architecture notes, etc.)
    """
    model_name: str
    model_version: str
    weights_hash: Optional[str]
    framework: str
    extra: Dict[str, Any] = None

    def fingerprint(self) -> str:
        """
        Generate cryptographic fingerprint of this model specification.
        
        Returns:
            64-character hexadecimal SHA-256 hash
        """
        return sha256_text(stable_json_dumps(asdict(self)))


@dataclass(frozen=True)
class ReproducibilityManifest:
    """
    Complete "receipt" for reconstructing an AI decision.
    
    This is the gold standard for clinical AI reproducibility. It contains
    everything needed to:
    1. Verify what happened (audit)
    2. Reproduce the exact same result (reproducibility)
    3. Validate consent was obtained (compliance)
    4. Trace back to original data (provenance)
    
    Key principle: Store hashes and pointers, NOT raw patient data.
    Raw data stays in secure storage (PACS, EHR, cloud), only references
    are recorded on-ledger.
    
    Attributes:
        event_id: Unique identifier for this inference/processing event
        created_at_ms: Timestamp of execution
        input_artifact_hash: SHA-256 of input data (proves exact input used)
        input_artifact_uri: Pointer to input data location (s3://, etc.)
        output_artifact_hashes: SHA-256 hashes of all outputs
        output_artifact_uris: Pointers to output locations
        consent_policy_id: Which consent policy authorized this
        consent_policy_hash: Hash of consent policy content (tamper detection)
        pipeline_manifest_hash: Hash of pipeline that ran
        model_spec_hash: Hash of model that was used
        actor_id: Who ran this (user ID)
        actor_role: Their role (dermatologist, engineer, etc.)
        purpose: Why this ran (clinical_care, research, etc.)
    """
    event_id: str
    created_at_ms: int
    input_artifact_hash: str
    input_artifact_uri: str
    output_artifact_hashes: Dict[str, str]
    output_artifact_uris: Dict[str, str]
    consent_policy_id: str
    consent_policy_hash: str
    pipeline_manifest_hash: str
    model_spec_hash: str
    actor_id: str
    actor_role: str
    purpose: str

    def fingerprint(self) -> str:
        """
        Generate cryptographic fingerprint of this reproducibility manifest.
        
        Returns:
            64-character hexadecimal SHA-256 hash
        """
        return sha256_text(stable_json_dumps(asdict(self)))




In [5]:
# ---------------------------------------
# 4) Ledger: append-only, hash-chained
# ---------------------------------------

@dataclass
class LedgerEntry:
    """
    A single immutable entry in the governance ledger.
    
    Each entry is a "block" containing:
    - Event metadata (type, timestamp, ID)
    - Event payload (hashes, decisions, audit data - NO PHI)
    - Hash chain links (prev_hash, entry_hash)
    
    The hash chain ensures tamper-evidence: changing any entry
    breaks all subsequent entries' hashes.
    
    Attributes:
        entry_id: Unique UUID for this entry
        timestamp_ms: When entry was created
        event_type: Category of event (e.g., "CONSENT_CREATED", "INFERENCE_EXECUTED")
        payload: Event-specific data (must not contain raw PHI)
        prev_hash: Hash of previous entry (creates chain)
        entry_hash: Hash of this entry (computed from prev_hash + content)
    """
    entry_id: str
    timestamp_ms: int
    event_type: str
    payload: Dict[str, Any]
    prev_hash: str
    entry_hash: str


class LocalLedger:
    """
    Append-only ledger with cryptographic hash chaining.
    
    This is a local implementation mimicking blockchain properties:
    1. Append-only: Can only add entries, never modify/delete
    2. Hash-chained: Each entry links to previous via cryptographic hash
    3. Tamper-evident: Any modification breaks chain integrity
    4. Auditable: Complete history of all governance events
    
    In production, replace with:
    - Private blockchain (Hyperledger Fabric)
    - Public blockchain (Ethereum, with privacy layers)
    - Distributed ledger (Ratio1, Corda)
    - Centralized but signed audit log (AWS QLDB)
    
    Storage format: JSON Lines (one JSON object per line)
    Each line is a complete LedgerEntry that can be parsed independently.
    """

    def __init__(self, ledger_path: str):
        """
        Initialize or open existing ledger.
        
        If ledger doesn't exist, creates it with a genesis entry.
        Genesis is the first entry with prev_hash = "0" * 64.
        
        Args:
            ledger_path: Filesystem path to ledger file
        """
        self.ledger_path = ledger_path
        os.makedirs(os.path.dirname(ledger_path) or ".", exist_ok=True)
        if not os.path.exists(ledger_path):
            # Genesis entry: first entry in chain
            self._write_raw({"genesis": True, "created_at_ms": now_epoch_ms()})

    def _read_all_raw(self) -> List[Dict[str, Any]]:
        """
        Read all entries from ledger file.
        
        Returns:
            List of dictionaries, one per ledger entry
        """
        with open(self.ledger_path, "r", encoding="utf-8") as f:
            return [json.loads(line) for line in f if line.strip()]

    def _write_raw(self, obj: Dict[str, Any]) -> None:
        """
        Append a raw JSON object to ledger file.
        
        Uses append mode to prevent overwriting existing entries.
        Each object is written on its own line (JSON Lines format).
        
        Args:
            obj: Dictionary to serialize and append
        """
        with open(self.ledger_path, "a", encoding="utf-8") as f:
            f.write(json.dumps(obj, ensure_ascii=False) + "\n")

    def last_hash(self) -> str:
        """
        Get the hash of the most recent ledger entry.
        
        This is needed to compute the next entry's prev_hash,
        which creates the hash chain linking.
        
        Returns:
            Hash of last entry, or "0"*64 if ledger only has genesis
        """
        rows = self._read_all_raw()
        # Find last actual LedgerEntry; genesis line has no entry_hash
        for row in reversed(rows):
            if "entry_hash" in row:
                return row["entry_hash"]
        return "0" * 64

    def append(self, event_type: str, payload: Dict[str, Any]) -> LedgerEntry:
        """
        Append a new governance event to the ledger.
        
        Process:
        1. Get previous entry's hash
        2. Create entry with prev_hash link
        3. Compute entry_hash = SHA256(prev_hash + entry_content)
        4. Write to disk (append-only)
        
        The hash computation is deterministic and tamper-evident:
        - Changing payload breaks entry_hash
        - Changing prev_hash breaks chain
        - Deleting/reordering entries breaks chain
        
        Args:
            event_type: Category of event (e.g., "CONSENT_CREATED")
            payload: Event-specific data (NO PHI - only hashes/pointers)
            
        Returns:
            Complete LedgerEntry object with computed hashes
        """
        prev = self.last_hash()
        entry_id = str(uuid.uuid4())
        ts = now_epoch_ms()

        # Body contains all fields except entry_hash
        body = {
            "entry_id": entry_id,
            "timestamp_ms": ts,
            "event_type": event_type,
            "payload": payload,
            "prev_hash": prev,
        }
        # Hash is computed from prev_hash + body
        entry_hash = sha256_text(prev + stable_json_dumps(body))
        entry = LedgerEntry(
            entry_id=entry_id,
            timestamp_ms=ts,
            event_type=event_type,
            payload=payload,
            prev_hash=prev,
            entry_hash=entry_hash
        )

        self._write_raw(asdict(entry))
        return entry

    def verify_integrity(self) -> Tuple[bool, Optional[str]]:
        """
        Verify cryptographic integrity of entire ledger.
        
        Walks through every entry and verifies:
        1. prev_hash matches previous entry's entry_hash
        2. entry_hash is correctly computed from prev_hash + content
        
        If ANY entry is tampered with, verification fails.
        This is how we detect unauthorized modifications.
        
        Returns:
            (success: bool, error_message: Optional[str])
            - (True, None) if ledger is intact
            - (False, "error description") if tampering detected
        """
        rows = self._read_all_raw()
        prev = "0" * 64

        for row in rows:
            if row.get("genesis"):
                continue
            expected_body = {
                "entry_id": row["entry_id"],
                "timestamp_ms": row["timestamp_ms"],
                "event_type": row["event_type"],
                "payload": row["payload"],
                "prev_hash": row["prev_hash"],
            }
            expected_hash = sha256_text(prev + stable_json_dumps(expected_body))
            if row["prev_hash"] != prev:
                return False, f"Broken prev_hash link at entry {row['entry_id']}"
            if row["entry_hash"] != expected_hash:
                return False, f"Hash mismatch at entry {row['entry_id']}"
            prev = row["entry_hash"]

        return True, None




In [6]:
# ---------------------------------------
# 5) Governance Overlay Orchestrator
# ---------------------------------------

class GovernanceOverlay:
    """
    High-level API orchestrating consent, audit, and reproducibility.
    
    This is the main interface for clinical AI governance. It provides:
    1. Consent management (store, verify, revoke policies)
    2. Access auditing (log all data access events)
    3. Inference recording (create reproducibility manifests)
    4. Training logging (record model training experiments)
    
    Architecture:
    - Sensitive data (consent policies, manifests) stored OFF-ledger
    - Only hashes and pointers stored ON-ledger
    - Ledger is tamper-evident but doesn't contain PHI
    
    This separation is crucial for:
    - Privacy: PHI never on distributed ledger
    - Scalability: Large files don't bloat ledger
    - Flexibility: Can change storage without touching ledger
    """

    def __init__(self, ledger: LocalLedger, policy_store_dir: str = f"{ARCHIVE_DIR}/policy_store", manifest_store_dir: str = f"{ARCHIVE_DIR}/manifest_store"):
        """
        Initialize governance system.
        
        Args:
            ledger: Append-only ledger for immutable audit trail
            policy_store_dir: Directory for off-ledger consent policies
            manifest_store_dir: Directory for off-ledger manifests
        """
        self.ledger = ledger
        self.policy_store_dir = policy_store_dir
        self.manifest_store_dir = manifest_store_dir
        os.makedirs(policy_store_dir, exist_ok=True)
        os.makedirs(manifest_store_dir, exist_ok=True)

    # ---------- Consent handling ----------

    def _policy_path(self, policy_id: str) -> str:
        """Get filesystem path for a consent policy file."""
        return os.path.join(self.policy_store_dir, f"{policy_id}.json")

    def store_consent_policy(self, policy: ConsentPolicy) -> Dict[str, str]:
        """
        Store a consent policy and record its hash on-ledger.
        
        Two-tier storage:
        1. OFF-ledger: Full policy JSON in local file (may contain pseudonymous IDs)
        2. ON-ledger: Only policy hash + metadata (no direct PHI)
        
        This enables:
        - Privacy: Patient data not on public/distributed ledger
        - Verification: Can prove policy hasn't been tampered with
        - Audit: Immutable record of consent creation
        
        Args:
            policy: ConsentPolicy object to store
            
        Returns:
            Dictionary with policy_id and policy_hash
        """
        policy_dict = asdict(policy)
        policy_json = stable_json_dumps(policy_dict)
        policy_hash = sha256_text(policy_json)

        # Off-ledger store (secure local/cloud storage)
        with open(self._policy_path(policy.policy_id), "w", encoding="utf-8") as f:
            f.write(policy_json)

        # On-ledger record (only hash + non-PHI metadata)
        self.ledger.append(
            event_type="CONSENT_CREATED",
            payload={
                "policy_id": policy.policy_id,
                "policy_hash": policy_hash,
                "subject_id": policy.subject_id,  # May be pseudonymous
                "version": policy.version,
                "valid_from_ms": policy.valid_from_ms,
                "valid_until_ms": policy.valid_until_ms,
                "revocable": policy.revocable,
            }
        )
        return {"policy_id": policy.policy_id, "policy_hash": policy_hash}

    def revoke_consent_policy(self, policy_id: str, actor_id: str) -> None:
        """
        Revoke a consent policy (patient withdraws authorization).
        
        Process:
        1. Load existing policy
        2. Check if revocable
        3. Mark as revoked and increment version
        4. Save updated policy off-ledger
        5. Record revocation event on-ledger
        
        After revocation, future access checks will fail.
        
        Args:
            policy_id: ID of policy to revoke
            actor_id: Who requested revocation (patient, admin, etc.)
            
        Raises:
            FileNotFoundError: If policy doesn't exist
            ValueError: If policy is not revocable
        """
        path = self._policy_path(policy_id)
        if not os.path.exists(path):
            raise FileNotFoundError(f"Unknown policy_id: {policy_id}")

        with open(path, "r", encoding="utf-8") as f:
            d = json.loads(f.read())

        if not d.get("revocable", True):
            raise ValueError("Policy is not revocable")

        d["revoked"] = True
        d["version"] = int(d.get("version", 1)) + 1

        policy_json = stable_json_dumps(d)
        new_hash = sha256_text(policy_json)

        with open(path, "w", encoding="utf-8") as f:
            f.write(policy_json)

        self.ledger.append(
            event_type="CONSENT_REVOKED",
            payload={"policy_id": policy_id, "new_policy_hash": new_hash, "actor_id": actor_id}
        )

    def load_consent_policy(self, policy_id: str) -> ConsentPolicy:
        """
        Load a consent policy from off-ledger storage.
        
        Args:
            policy_id: ID of policy to load
            
        Returns:
            ConsentPolicy object
        """
        path = self._policy_path(policy_id)
        with open(path, "r", encoding="utf-8") as f:
            d = json.loads(f.read())
        return ConsentPolicy(**d)

    def check_consent(self, policy_id: str, purpose: str, actor_role: str, t_ms: Optional[int] = None) -> bool:
        """
        Check if consent policy permits a specific access request.
        
        This is the authorization gateway. Before any data access,
        this function verifies:
        1. Policy exists
        2. Policy is temporally valid
        3. Purpose is authorized
        4. Role is authorized
        
        Args:
            policy_id: Which consent policy to check
            purpose: Intended use (e.g., "clinical_care")
            actor_role: Requesting role (e.g., "dermatologist")
            t_ms: Check at this timestamp (default: now)
            
        Returns:
            True if access authorized, False otherwise
        """
        t_ms = now_epoch_ms() if t_ms is None else t_ms
        policy = self.load_consent_policy(policy_id)
        return policy.allows(purpose=purpose, role=actor_role, t_ms=t_ms)

    # ---------- Manifests ----------

    def _manifest_path(self, manifest_id: str) -> str:
        """Get filesystem path for a manifest file."""
        return os.path.join(self.manifest_store_dir, f"{manifest_id}.json")

    def store_manifest_off_ledger(self, manifest: Any, manifest_id: str) -> str:
        """
        Store any manifest off-ledger and return its hash.
        
        Manifests (pipeline configs, reproducibility receipts) are stored
        off-ledger because they can be large. Only their hashes go on-ledger.
        
        Args:
            manifest: Manifest object (must have asdict() method)
            manifest_id: Unique identifier for this manifest
            
        Returns:
            SHA-256 hash of manifest content
        """
        manifest_json = stable_json_dumps(asdict(manifest))
        manifest_hash = sha256_text(manifest_json)
        with open(self._manifest_path(manifest_id), "w", encoding="utf-8") as f:
            f.write(manifest_json)
        return manifest_hash

    # ---------- Audit / provenance ----------

    def record_data_access(self, actor_id: str, actor_role: str, purpose: str, artifact_uri: str, artifact_hash: str) -> None:
        """
        Log a data access event to the audit ledger.
        
        Creates immutable record of:
        - Who accessed data
        - Their role and purpose
        - What data was accessed (via URI + hash)
        - When access occurred
        
        This supports:
        - Audit trails for compliance (HIPAA, GDPR)
        - Breach investigation (who accessed what when)
        - Usage analytics (how often is data accessed)
        
        Args:
            actor_id: User/system identifier
            actor_role: Their role (dermatologist, engineer, etc.)
            purpose: Why they accessed (clinical_care, research, etc.)
            artifact_uri: Pointer to data (s3://, file://, etc.)
            artifact_hash: SHA-256 of data (proves exact version accessed)
        """
        self.ledger.append(
            event_type="DATA_ACCESSED",
            payload={
                "actor_id": actor_id,
                "actor_role": actor_role,
                "purpose": purpose,
                "artifact_uri": artifact_uri,
                "artifact_hash": artifact_hash,
            }
        )

    # ---------- Inference "receipt" ----------

    def record_inference(
        self,
        actor_id: str,
        actor_role: str,
        purpose: str,
        consent_policy_id: str,
        input_artifact_uri: str,
        input_artifact_hash: str,
        output_artifacts: Dict[str, Tuple[str, str]],
        pipeline_manifest: PipelineManifest,
        model_spec: ModelSpec,
    ) -> ReproducibilityManifest:
        """
        Record an AI inference run with full reproducibility.
        
        This is the core function for clinical AI governance. It creates
        a complete audit trail for each AI decision:
        
        1. Consent check: Verify authorization before processing
        2. Manifest creation: Document exactly what ran (model, pipeline, params)
        3. Ledger recording: Write immutable audit event
        4. Reproducibility: Enable exact reconstruction of decision later
        
        If consent check fails, logs CONSENT_DENIED event and raises error.
        
        Process flow:
        1. Check consent authorization
        2. Hash consent policy content
        3. Compute pipeline + model fingerprints
        4. Create reproducibility manifest with all references
        5. Store manifest off-ledger
        6. Write INFERENCE_EXECUTED event on-ledger (hashes only)
        
        Args:
            actor_id: Who ran inference (user ID)
            actor_role: Their role (dermatologist, ml_engineer, etc.)
            purpose: Why inference ran (clinical_care, research, etc.)
            consent_policy_id: Which consent authorizes this
            input_artifact_uri: Pointer to input data (s3://, etc.)
            input_artifact_hash: SHA-256 of input (proves exact input)
            output_artifacts: Dict mapping output names to (uri, hash) tuples
            pipeline_manifest: Description of processing pipeline
            model_spec: Description of AI model used
            
        Returns:
            ReproducibilityManifest containing all information needed
            to reconstruct this inference later
            
        Raises:
            PermissionError: If consent check fails
        """
        t_ms = now_epoch_ms()

        # Authorization check BEFORE processing
        if not self.check_consent(consent_policy_id, purpose, actor_role, t_ms=t_ms):
            # Log denial for audit trail
            self.ledger.append(
                event_type="CONSENT_DENIED",
                payload={
                    "policy_id": consent_policy_id,
                    "actor_id": actor_id,
                    "actor_role": actor_role,
                    "purpose": purpose,
                    "timestamp_ms": t_ms,
                }
            )
            raise PermissionError("Consent check failed")

        # Hash consent policy content (for tamper detection)
        policy = self.load_consent_policy(consent_policy_id)
        policy_hash = sha256_text(stable_json_dumps(asdict(policy)))

        # Store pipeline + model manifests off-ledger and get fingerprints
        pipeline_hash = pipeline_manifest.fingerprint()
        model_hash = model_spec.fingerprint()

        # Prepare output mapping
        out_uris = {k: v[0] for k, v in output_artifacts.items()}
        out_hashes = {k: v[1] for k, v in output_artifacts.items()}

        # Create reproducibility manifest
        event_id = str(uuid.uuid4())
        repro = ReproducibilityManifest(
            event_id=event_id,
            created_at_ms=t_ms,
            input_artifact_hash=input_artifact_hash,
            input_artifact_uri=input_artifact_uri,
            output_artifact_hashes=out_hashes,
            output_artifact_uris=out_uris,
            consent_policy_id=consent_policy_id,
            consent_policy_hash=policy_hash,
            pipeline_manifest_hash=pipeline_hash,
            model_spec_hash=model_hash,
            actor_id=actor_id,
            actor_role=actor_role,
            purpose=purpose
        )

        # Store full reproducibility manifest off-ledger
        repro_hash = self.store_manifest_off_ledger(repro, manifest_id=f"repro_{event_id}")

        # Write ledger event (only references + hashes, NO PHI)
        self.ledger.append(
            event_type="INFERENCE_EXECUTED",
            payload={
                "event_id": event_id,
                "actor_id": actor_id,
                "actor_role": actor_role,
                "purpose": purpose,
                "consent_policy_id": consent_policy_id,
                "consent_policy_hash": policy_hash,
                "input_artifact_uri": input_artifact_uri,
                "input_artifact_hash": input_artifact_hash,
                "output_artifact_uris": out_uris,
                "output_artifact_hashes": out_hashes,
                "pipeline_manifest_hash": pipeline_hash,
                "model_spec_hash": model_hash,
                "repro_manifest_hash": repro_hash,
            }
        )

        return repro

    # ---------- Training / experiments on de-ID or synthetic data ----------

    def record_training_experiment(
        self,
        actor_id: str,
        actor_role: str,
        purpose: str,
        dataset_uri: str,
        dataset_hash: str,
        dataset_type: str,
        training_manifest: PipelineManifest,
        model_spec_before: ModelSpec,
        model_spec_after: ModelSpec,
        compute_backend: str,
    ) -> str:
        """
        Log a model training/experiment run.
        
        For research/development, training typically uses:
        - De-identified data (PHI removed/pseudonymized)
        - Synthetic data (generated, no real patients)
        - Decentralized compute (privacy-preserving training)
        
        This function records:
        - What dataset was used (must be de-ID or synthetic)
        - Training configuration (hyperparameters, etc.)
        - Model before and after training
        - Where training ran (local, cloud, federated, etc.)
        
        Creates immutable audit trail of all model development.
        
        Args:
            actor_id: Who ran training
            actor_role: Their role (ml_engineer, researcher, etc.)
            purpose: Why training ran (usually "research")
            dataset_uri: Pointer to training dataset
            dataset_hash: SHA-256 of dataset
            dataset_type: Must be "deidentified" or "synthetic"
            training_manifest: Training pipeline configuration
            model_spec_before: Model version before training
            model_spec_after: Model version after training
            compute_backend: Where training ran ("ratio1", "aws", "local", etc.)
            
        Returns:
            Event ID (UUID) of training record
            
        Raises:
            ValueError: If dataset_type is not "deidentified" or "synthetic"
        """
        if dataset_type not in {"deidentified", "synthetic"}:
            raise ValueError("dataset_type must be 'deidentified' or 'synthetic'")

        t_ms = now_epoch_ms()
        event_id = str(uuid.uuid4())

        train_hash = training_manifest.fingerprint()
        model_before_hash = model_spec_before.fingerprint()
        model_after_hash = model_spec_after.fingerprint()

        payload = {
            "event_id": event_id,
            "timestamp_ms": t_ms,
            "actor_id": actor_id,
            "actor_role": actor_role,
            "purpose": purpose,
            "dataset_uri": dataset_uri,
            "dataset_hash": dataset_hash,
            "dataset_type": dataset_type,
            "training_manifest_hash": train_hash,
            "model_before_hash": model_before_hash,
            "model_after_hash": model_after_hash,
            "compute_backend": compute_backend,
        }

        self.ledger.append(event_type="TRAINING_EXPERIMENT_EXECUTED", payload=payload)
        return event_id





## ðŸŽ¯ Priority Recommendations

For **immediate impact**, I'd suggest implementing:

1. **Zero-Knowledge Proofs** (#1) - Cutting edge for privacy
2. **Differential Privacy** (#2) - Industry standard for medical AI
3. **Federated Learning** (#3) - Hot topic in multi-institutional research
4. **Explainability Provenance** (#10) - Critical for clinical acceptance

These would make your system truly state-of-the-art for 2025 clinical AI governance!



In [7]:

# ---------------------------------------
# 6) Example usage (run this as a script)
# ---------------------------------------

def demo():
    """
    Demonstration of complete governance workflow.
    
    Shows end-to-end example:
    1. Create ledger + governance system
    2. Store consent policy
    3. Record data access
    4. Run AI inference with full reproducibility
    5. Record model training experiment
    6. Verify ledger integrity
    
    This is a reference implementation showing how all pieces fit together.
    """

    ledger = LocalLedger(f"{ARCHIVE_DIR}/ledger/clinical_governance_ledger.jsonl")
    overlay = GovernanceOverlay(ledger)

    # --- Consent: create policy ---
    policy = ConsentPolicy(
        policy_id="consent_001",
        subject_id="patient_pseudo_123",  # pseudonymous
        allowed_purposes=["clinical_care", "research"],
        allowed_roles=["dermatologist", "ml_engineer"],
        valid_from_ms=now_epoch_ms() - 1000,
        valid_until_ms=None,
        revocable=True,
        revoked=False,
        version=1
    )
    overlay.store_consent_policy(policy)

    # --- Input artifact (pretend an image exists in cloud) ---
    # In real system: image stored in S3/Azure/GCP. Here: local file hash as example.
    # If you don't have a real image, create a dummy file.
    dummy_img_path = f"{ARCHIVE_DIR}/data/example_image.bin"
    os.makedirs(f"{ARCHIVE_DIR}/data", exist_ok=True)
    if not os.path.exists(dummy_img_path):
        with open(dummy_img_path, "wb") as f:
            f.write(os.urandom(4096))  # dummy bytes

    input_hash = sha256_file(dummy_img_path)
    input_uri = "s3://noetiv-bucket/patient_pseudo_123/visit1/image1.jpg"  # example pointer

    overlay.record_data_access(
        actor_id="user_42",
        actor_role="dermatologist",
        purpose="clinical_care",
        artifact_uri=input_uri,
        artifact_hash=input_hash
    )

    # --- Pipeline + model specs (what ran) ---
    pipeline = PipelineManifest(
        pipeline_name="dermoscopy_inference",
        pipeline_version="1.0.0",
        container_image="registry/noetiv/dermoscopy:1.0.0",
        code_commit="abc123def",
        parameters={"img_size": 512, "threshold": 0.35, "xai": True},
        dependencies={"python": "3.11", "torch": "2.4.0", "numpy": "2.0.1"}
    )

    model = ModelSpec(
        model_name="lesion_classifier",
        model_version="0.9.3",
        weights_hash="deadbeef..." ,  # replace with real hash/digest
        framework="pytorch",
        extra={"calibration": "temperature_scaling_v2"}
    )

    # --- Output artifacts (pretend saved in cloud) ---
    # In practice, these are report JSON, mask PNG, heatmap, etc.
    # Here we generate dummy bytes and hash them.
    report_path = f"{ARCHIVE_DIR}/data/report.json"
    with open(report_path, "w", encoding="utf-8") as f:
        f.write(stable_json_dumps({"prediction": "benign", "confidence": 0.83}))

    report_hash = sha256_file(report_path)
    report_uri = "s3://noetiv-bucket/patient_pseudo_123/visit1/report.json"

    outputs = {
        "report_json": (report_uri, report_hash),
    }

    # --- Record inference with reproducibility manifest ---
    repro = overlay.record_inference(
        actor_id="user_42",
        actor_role="dermatologist",
        purpose="clinical_care",
        consent_policy_id="consent_001",
        input_artifact_uri=input_uri,
        input_artifact_hash=input_hash,
        output_artifacts=outputs,
        pipeline_manifest=pipeline,
        model_spec=model,
    )

    print("Reproducibility manifest created:")
    print(stable_json_dumps(asdict(repro)))

    # --- Record a training experiment (de-ID or synthetic) on "decentralized compute" ---
    train_pipeline = PipelineManifest(
        pipeline_name="train_lesion_classifier",
        pipeline_version="0.1.0",
        container_image="registry/noetiv/train:0.1.0",
        code_commit="train789",
        parameters={"epochs": 10, "lr": 1e-4, "batch_size": 16},
        dependencies={"python": "3.11", "torch": "2.4.0"}
    )

    model_before = model
    model_after = ModelSpec(
        model_name="lesion_classifier",
        model_version="0.9.4",
        weights_hash="feedface...",  # replace with real hash/digest
        framework="pytorch",
        extra={"notes": "trained on synthetic dataset v2"}
    )

    # Example: synthetic dataset pointer + hash
    synthetic_dataset_path = f"{ARCHIVE_DIR}/data/synth_dataset.bin"
    with open(synthetic_dataset_path, "wb") as f:
        f.write(os.urandom(8192))

    dataset_hash = sha256_file(synthetic_dataset_path)
    dataset_uri = "s3://noetiv-research/synthetic/derm_v2/dataset.tar"

    training_event_id = overlay.record_training_experiment(
        actor_id="user_99",
        actor_role="ml_engineer",
        purpose="research",
        dataset_uri=dataset_uri,
        dataset_hash=dataset_hash,
        dataset_type="synthetic",
        training_manifest=train_pipeline,
        model_spec_before=model_before,
        model_spec_after=model_after,
        compute_backend="ratio1"  # conceptually
    )

    print("Training experiment recorded:", training_event_id)

    # --- Verify ledger integrity (auditor step) ---
    ok, err = ledger.verify_integrity()
    print("Ledger integrity OK?" , ok, err)



In [8]:

demo()
# if __name__ == "__main__":
#     demo()

Reproducibility manifest created:
{"actor_id":"user_42","actor_role":"dermatologist","consent_policy_hash":"51df09aee42907709f55997615e18e9690d5a2374007aad880fad8b970db0c26","consent_policy_id":"consent_001","created_at_ms":1766177554112,"event_id":"8e70bd7a-fee2-435c-9527-2cf6103bb5f4","input_artifact_hash":"800cb786fe3d72734d24e848df4894ff6b626687ad147b55a95d12df3a10d837","input_artifact_uri":"s3://noetiv-bucket/patient_pseudo_123/visit1/image1.jpg","model_spec_hash":"0cdab167e8a42f210da84c0963f80928624db9cce89a7c026aa081806a3ee427","output_artifact_hashes":{"report_json":"99e6f5f9bbe470bb28bc864fa9de01f08f070dc8841536aa579e4b98599a7dc5"},"output_artifact_uris":{"report_json":"s3://noetiv-bucket/patient_pseudo_123/visit1/report.json"},"pipeline_manifest_hash":"9d9f973085559f55e7ab1a93444dde20a80f3ffa330bd7212dd894e481d7c939","purpose":"clinical_care"}
Training experiment recorded: a74279d9-8eec-4631-a1ef-582df39ee059
Ledger integrity OK? True None


In [9]:
import random
import copy
from typing import Dict, Any

# ---------------------------------------
# Defaults used by simulations
# ---------------------------------------

DEFAULT_PIPELINE = PipelineManifest(
    pipeline_name="simulated_dermoscopy_pipeline",
    pipeline_version="sim-1.0",
    container_image=None,
    code_commit="sim_commit",
    parameters={"img_size": 512, "threshold": 0.5},
    dependencies={"python": "3.11"}
)

DEFAULT_MODEL = ModelSpec(
    model_name="simulated_model",
    model_version="0.1",
    weights_hash="0" * 64,
    framework="pytorch",
    extra={"note": "simulation"}
)

def simulate_workload(overlay, n_runs=200, tamper_rate=0.1, revoke_at=120) -> Dict[str, Any]:
    """
    Simulate realistic clinical AI workload with mixed access patterns.
    
    This function simulates a healthcare environment with:
    - Multiple users with different roles (dermatologist, engineer, guest)
    - Different purposes (clinical care, research)
    - Consent revocation mid-simulation
    - Optional ledger tampering (for testing integrity checks)
    
    Tracks:
    - How many inferences executed successfully
    - How many were blocked by consent
    - Ledger integrity status
    - Reproducibility manifest count
    
    Args:
        overlay: GovernanceOverlay instance to test
        n_runs: Number of inference attempts to simulate
        tamper_rate: Probability of tampering (currently unused in this version)
        revoke_at: Simulation step at which to revoke consent
        
    Returns:
        Dictionary with execution statistics and integrity results
    """
    roles = ["dermatologist", "ml_engineer", "guest"]
    purposes = ["clinical_care", "research"]

    blocked = 0
    executed = 0
    denied_events = 0

    # For reproducibility scoring: store returned repro manifests
    repro_manifests = []

    for i in range(n_runs):
        # Revoke consent at a given time
        if i == revoke_at:
            try:
                overlay.revoke_consent_policy("consent_001", actor_id="admin")
            except Exception:
                pass

        actor_role = random.choice(roles)
        purpose = random.choice(purposes)

        try:
            repro = overlay.record_inference(
                actor_id=f"user_{i}",
                actor_role=actor_role,
                purpose=purpose,
                consent_policy_id="consent_001",
                input_artifact_uri=f"s3://bucket/patient/visit{i}/img.jpg",
                input_artifact_hash=f"{i:064x}",  # simulated hash
                output_artifacts={"report_json": (f"s3://bucket/patient/visit{i}/report.json", f"{(i+1):064x}")},
                pipeline_manifest=DEFAULT_PIPELINE,
                model_spec=DEFAULT_MODEL,
            )
            executed += 1
            repro_manifests.append(repro)
        except PermissionError:
            blocked += 1
            denied_events += 1
        except Exception:
            # other errors
            pass

    # Integrity check (only meaningful for ledger variants)
    ok, err = overlay.ledger.verify_integrity()

    return {
        "executed": executed,
        "blocked": blocked,
        "denied_events": denied_events,
        "ledger_integrity_ok": ok,
        "ledger_integrity_error": err,
        "n_repro_manifests": len(repro_manifests),
    }


def tamper_ledger_file(path: str, flip_probability: float = 0.2):
    """
    Intentionally tamper with ledger to test integrity detection.
    
    This function simulates an attacker or accidental corruption:
    - Randomly selects an entry from the ledger
    - Modifies its payload (adds/changes fields)
    - Writes corrupted version back to disk
    
    If integrity checking works correctly, this should be detected
    by verify_integrity() since it breaks the hash chain.
    
    Args:
        path: Path to ledger file to tamper with
        flip_probability: Probability of adding "tampered" flag vs changing purpose
        
    Returns:
        True if tampering succeeded, False if no valid entries to tamper
    """
    import json
    lines = open(path, "r", encoding="utf-8").read().splitlines()
    candidates = [idx for idx, ln in enumerate(lines) if '"entry_hash"' in ln]

    if not candidates:
        return False

    idx = random.choice(candidates)
    obj = json.loads(lines[idx])

    # mutate a field
    if random.random() < flip_probability:
        obj["payload"]["tampered"] = True
    else:
        obj["payload"]["purpose"] = "tampered_purpose"

    lines[idx] = json.dumps(obj, ensure_ascii=False)
    open(path, "w", encoding="utf-8").write("\n".join(lines) + "\n")
    return True

In [10]:
# ---------------------------------------
# Off-ledger reproducibility manifest verification
# ---------------------------------------

def list_inference_events(ledger_path: str):
    """
    Extract all inference events from ledger.
    
    Scans ledger file and collects metadata about all
    INFERENCE_EXECUTED events. Useful for:
    - Audit reports (how many inferences ran)
    - Batch verification (check reproducibility of all events)
    - Usage analytics
    
    Args:
        ledger_path: Path to ledger file
        
    Returns:
        List of dictionaries with event_id, repro_manifest_hash,
        entry_id, and timestamp_ms for each inference event
    """
    events = []
    with open(ledger_path, "r", encoding="utf-8") as f:
        for line in f:
            if not line.strip():
                continue
            row = json.loads(line)
            if row.get("event_type") == "INFERENCE_EXECUTED":
                payload = row.get("payload", {})
                events.append({
                    "event_id": payload.get("event_id"),
                    "repro_manifest_hash": payload.get("repro_manifest_hash"),
                    "entry_id": row.get("entry_id"),
                    "timestamp_ms": row.get("timestamp_ms"),
                })
    return events


def get_inference_event(ledger_path: str, event_id: str):
    """
    Retrieve a specific inference event by ID.
    
    Args:
        ledger_path: Path to ledger file
        event_id: UUID of inference event to find
        
    Returns:
        Full ledger entry dictionary, or None if not found
    """
    with open(ledger_path, "r", encoding="utf-8") as f:
        for line in f:
            if not line.strip():
                continue
            row = json.loads(line)
            if row.get("event_type") == "INFERENCE_EXECUTED":
                payload = row.get("payload", {})
                if payload.get("event_id") == event_id:
                    return row
    return None


def verify_repro_manifest_against_ledger(
    ledger_path: str,
    manifest_store_dir: str,
    event_id: str
):
    """
    Verify that off-ledger reproducibility manifest matches on-ledger hash.
    
    This is critical for detecting tampering with off-ledger storage.
    Process:
    1. Find inference event in ledger (contains manifest hash)
    2. Load manifest from off-ledger storage
    3. Compute manifest hash locally
    4. Compare: local hash vs ledger hash
    
    If hashes don't match, manifest has been tampered with.
    
    This provides two-tier verification:
    - Ledger integrity (via hash chain)
    - Manifest integrity (via hash comparison)
    
    Args:
        ledger_path: Path to ledger file
        manifest_store_dir: Directory with manifest files
        event_id: UUID of inference event to verify
        
    Returns:
        Dictionary with:
        - ok: True if verification passed
        - reason: Error message if verification failed
        - ledger_repro_hash: Hash stored on ledger
        - local_repro_hash: Hash computed from local file
        - manifest_path: Path to manifest file
    """
    entry = get_inference_event(ledger_path, event_id)
    if entry is None:
        return {"ok": False, "reason": "No INFERENCE_EXECUTED event found"}

    ledger_hash = entry["payload"].get("repro_manifest_hash")
    if not ledger_hash:
        return {"ok": False, "reason": "No repro_manifest_hash in ledger"}

    manifest_path = os.path.join(manifest_store_dir, f"repro_{event_id}.json")
    if not os.path.exists(manifest_path):
        return {"ok": False, "reason": "Manifest file missing"}

    with open(manifest_path, "r", encoding="utf-8") as f:
        manifest_obj = json.loads(f.read())

    local_hash = hashlib.sha256(
        json.dumps(
            manifest_obj,
            sort_keys=True,
            separators=(",", ":"),
            ensure_ascii=False
        ).encode("utf-8")
    ).hexdigest()

    return {
        "ok": local_hash == ledger_hash,
        "event_id": event_id,
        "ledger_repro_hash": ledger_hash,
        "local_repro_hash": local_hash,
        "manifest_path": manifest_path,
    }


def tamper_repro_manifest(manifest_store_dir: str, event_id: str):
    """
    Intentionally corrupt a reproducibility manifest for testing.
    
    Simulates attacker or accidental corruption of off-ledger storage.
    Adds fake fields and changes values.
    
    If verification works correctly, this should be detected by
    verify_repro_manifest_against_ledger() since hash won't match ledger.
    
    Args:
        manifest_store_dir: Directory containing manifests
        event_id: UUID of event whose manifest to corrupt
        
    Raises:
        FileNotFoundError: If manifest doesn't exist
    """
    manifest_path = os.path.join(manifest_store_dir, f"repro_{event_id}.json")
    if not os.path.exists(manifest_path):
        raise FileNotFoundError(manifest_path)

    with open(manifest_path, "r", encoding="utf-8") as f:
        obj = json.loads(f.read())

    # Intentional tampering
    obj["tampered"] = True
    obj["purpose"] = "tampered_purpose"

    with open(manifest_path, "w", encoding="utf-8") as f:
        f.write(json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False))

In [11]:
def run_ablation():
    """
    Run ablation study to test governance system components.
    
    Ablation study isolates and tests specific features:
    1. Baseline: Run simulation with no tampering
    2. Verify: Check ledger integrity (should pass)
    3. Tamper: Corrupt ledger
    4. Re-verify: Check integrity again (should fail)
    
    This validates that:
    - Consent enforcement works (some requests blocked)
    - Audit trail is complete
    - Integrity checking detects tampering
    
    Prints results showing executed vs blocked inferences,
    ledger integrity before and after tampering.
    """
    # Fresh ledger each run
    ledger_path = "{ARCHIVE_DIR}/ledger/ablation_ledger.jsonl"
    if os.path.exists(ledger_path):
        os.remove(ledger_path)

    ledger = LocalLedger(ledger_path)
    overlay = GovernanceOverlay(ledger)

    # Create consent policy
    policy = ConsentPolicy(
        policy_id="consent_001",
        subject_id="patient_sim",
        allowed_purposes=["clinical_care", "research"],
        allowed_roles=["dermatologist", "ml_engineer"],
        valid_from_ms=now_epoch_ms() - 1000,
        valid_until_ms=None,
        revocable=True,
        revoked=False,
        version=1
    )
    overlay.store_consent_policy(policy)

    # --- Run simulation ---
    results = simulate_workload(
        overlay,
        n_runs=200,
        tamper_rate=0.1,
        revoke_at=120
    )

    print("\n=== Ablation results (no tampering) ===")
    for k, v in results.items():
        print(f"{k}: {v}")

    # --- Tamper and re-verify ---
    tamper_ledger_file(ledger_path)
    ok, err = ledger.verify_integrity()

    print("\n=== After tampering ===")
    print("Ledger integrity OK?", ok)
    print("Error:", err)


# if __name__ == "__main__":

run_ablation()


=== Ablation results (no tampering) ===
executed: 77
blocked: 123
denied_events: 123
ledger_integrity_ok: True
ledger_integrity_error: None
n_repro_manifests: 77

=== After tampering ===
Ledger integrity OK? False
Error: Hash mismatch at entry 22f3a690-2e54-444c-9661-d5bcdf75e4cc


In [12]:
"""
Archive results for later analysis.

Creates timestamped directory and copies:
- Manifest store (off-ledger reproducibility receipts)
- Ledger (on-ledger audit trail)
- Policy store (consent policies)

This enables:
- Historical analysis (compare governance over time)
- Compliance documentation (preserve audit evidence)
- Reproducibility (reconstruct past system state)
"""



# shutil.copytree("./manifest_store", f"{ARCHIVE_DIR}/manifest_store")
# shutil.copytree("./ledger", f"{ARCHIVE_DIR}/ledger")
# shutil.copytree("./policy_store", f"{ARCHIVE_DIR}/policy_store")

print("Archived results to:", ARCHIVE_DIR)

Archived results to: results_20251219_225234


In [13]:
import random, time, os, json
from dataclasses import asdict

def random_policy(i, t0_ms):
    """
    Generate diverse consent policies for simulation.
    
    Creates realistic mix of policy types:
    - Clinical-only: Only for patient care
    - Clinical + research: Dual-use authorization
    - Expiring: Time-limited consent
    - Non-revocable: Cannot be withdrawn (rare, institutional research)
    
    Weighted distribution mimics real healthcare environment
    where most consent is clinical-focused.
    
    Args:
        i: Policy index (used for unique IDs)
        t0_ms: Base timestamp for policy validity
        
    Returns:
        ConsentPolicy with randomized but realistic attributes
    """

    policy_id = f"consent_{i:04d}"
    subject_id = f"patient_{i:04d}"

    # Mix policy types
    kind = random.choices(
        ["clinical_only", "clinical_research", "expires_fast", "non_revocable"],
        weights=[0.45, 0.35, 0.15, 0.05],
        k=1
    )[0]

    allowed_roles = ["dermatologist", "ml_engineer"]
    allowed_purposes = ["clinical_care"]

    valid_until = None
    revocable = True

    if kind == "clinical_research":
        allowed_purposes = ["clinical_care", "research"]
    elif kind == "expires_fast":
        allowed_purposes = ["clinical_care", "research"]
        valid_until = t0_ms + random.randint(10_000, 60_000)  # expires within 10-60s
    elif kind == "non_revocable":
        allowed_purposes = ["clinical_care"]
        revocable = False

    return ConsentPolicy(
        policy_id=policy_id,
        subject_id=subject_id,
        allowed_purposes=allowed_purposes,
        allowed_roles=allowed_roles,
        valid_from_ms=t0_ms - 1000,
        valid_until_ms=valid_until,
        revocable=revocable,
        revoked=False,
        version=1
    )

def generate_actor():
    """
    Generate random actor (user) for access simulation.
    
    Creates diverse mix of:
    - Authorized roles (dermatologist, ml_engineer) - most common
    - Unauthorized roles (guest, nurse) - some attempts should fail
    - Mixed purposes (clinical_care dominant, some research)
    
    Weighted distribution models real access patterns.
    
    Returns:
        Tuple of (role: str, purpose: str)
    """
    roles = ["dermatologist", "ml_engineer", "guest", "nurse", "admin"]
    role = random.choices(roles, weights=[0.55, 0.18, 0.12, 0.10, 0.05], k=1)[0]
    purpose = random.choices(["clinical_care", "research"], weights=[0.75, 0.25], k=1)[0]
    return role, purpose

def run_scenario(
    overlay,
    n_patients=200,
    n_runs=1000,
    revoke_prob=0.02,
    tamper_after=False,
    seed=7,
):
    """
    Run comprehensive governance scenario with multiple patients.
    
    This is the main simulation function for testing governance at scale.
    It models a realistic clinical AI deployment:
    
    1. Many patients (each with their own consent policy)
    2. Many inference runs (clinical + research uses)
    3. Dynamic consent (policies can be revoked mid-simulation)
    4. Mixed authorization (authorized + unauthorized attempts)
    5. Optional tampering (test integrity detection)
    
    Tracks detailed metrics:
    - Execution counts (successful vs blocked)
    - Unauthorized access attempts and blocks
    - Post-revocation executions (should not happen)
    - Governance overhead (time per operation)
    - Audit completeness (all events logged)
    - Ledger integrity (before/after tampering)
    
    Args:
        overlay: GovernanceOverlay instance
        n_patients: Number of unique patients/policies to create
        n_runs: Number of inference attempts to simulate
        revoke_prob: Probability of revoking consent at each step
        tamper_after: Whether to tamper with ledger after simulation
        seed: Random seed for reproducibility
        
    Returns:
        Dictionary with comprehensive statistics about execution,
        consent enforcement, audit completeness, and integrity
    """
    random.seed(seed)
    t0 = now_epoch_ms()

    # Create policies for many patients
    policy_ids = []
    for i in range(n_patients):
        p = random_policy(i, t0)
        overlay.store_consent_policy(p)
        policy_ids.append(p.policy_id)

    # Stats
    stats = {
        "executed": 0,
        "blocked": 0,
        "attempted_unauthorized": 0,
        "blocked_unauthorized": 0,
        "post_revoke_exec": 0,
        "events_expected": 0,
        "events_actual": 0,
        "governance_time_ms": 0,
    }

    # Keep track of revoked policies (ground truth)
    revoked = set()

    for i in range(n_runs):
        policy_id = random.choice(policy_ids)
        actor_role, purpose = generate_actor()
        actor_id = f"{actor_role}_{random.randint(1,5000)}"

        # Random revocation events
        if random.random() < revoke_prob:
            try:
                overlay.revoke_consent_policy(policy_id, actor_id="admin_sim")
                revoked.add(policy_id)
            except Exception:
                pass

        # Simulated artifacts
        input_uri = f"s3://bucket/{policy_id}/visit{i}/img.jpg"
        input_hash = f"{random.getrandbits(256):064x}"
        out_uri = f"s3://bucket/{policy_id}/visit{i}/report.json"
        out_hash = f"{random.getrandbits(256):064x}"

        # Define "unauthorized" attempts (by policy rules)
        # We'll count guest/nurse as usually unauthorized
        unauthorized_attempt = actor_role in {"guest", "nurse"}
        if unauthorized_attempt:
            stats["attempted_unauthorized"] += 1

        start = time.perf_counter()
        try:
            overlay.record_inference(
                actor_id=actor_id,
                actor_role=actor_role,
                purpose=purpose,
                consent_policy_id=policy_id,
                input_artifact_uri=input_uri,
                input_artifact_hash=input_hash,
                output_artifacts={"report_json": (out_uri, out_hash)},
                pipeline_manifest=DEFAULT_PIPELINE,
                model_spec=DEFAULT_MODEL,
            )
            stats["executed"] += 1

            if policy_id in revoked:
                # if it still executed after revocation, thatâ€™s a drift incident
                stats["post_revoke_exec"] += 1

        except PermissionError:
            stats["blocked"] += 1
            if unauthorized_attempt:
                stats["blocked_unauthorized"] += 1
        finally:
            stats["governance_time_ms"] += (time.perf_counter() - start) * 1000

        # Audit completeness expectation:
        # For each run we expect at least an INFERENCE_EXECUTED or CONSENT_DENIED event
        stats["events_expected"] += 1

    # Count actual entries (excluding genesis)
    rows = overlay.ledger._read_all_raw()
    stats["events_actual"] = sum(1 for r in rows if "entry_hash" in r)

    # Integrity check before optional tampering
    ok_before, err_before = overlay.ledger.verify_integrity()

    if tamper_after:
        tamper_ledger_file(overlay.ledger.ledger_path)
        ok_after, err_after = overlay.ledger.verify_integrity()
    else:
        ok_after, err_after = ok_before, err_before

    stats.update({
        "ledger_integrity_ok_before": ok_before,
        "ledger_integrity_err_before": err_before,
        "ledger_integrity_ok_after": ok_after,
        "ledger_integrity_err_after": err_after,
        "unauthorized_block_rate": (stats["blocked_unauthorized"] / max(1, stats["attempted_unauthorized"])),
        "avg_governance_ms_per_run": (stats["governance_time_ms"] / max(1, n_runs)),
        "ledger_entries": stats["events_actual"],
    })
    return stats

In [14]:
"""
Run multiple governance scenarios with different parameters.

Each scenario tests different aspects:
- Small scale (50 patients, 500 runs) - baseline functionality
- Medium scale (200 patients, 2000 runs) - realistic load with revocations
- Large scale (500 patients, 5000 runs) - stress test (commented out)
- Tampering test (with tamper_after=True) - integrity detection (commented out)

Results show how governance system behaves under various conditions.
"""

scenarios = [
  # Baselines (steady-state)
  {"n_patients": 50,  "n_runs": 500,   "revoke_prob": 0.00, "tamper_after": False, "seed": 1},
  {"n_patients": 200, "n_runs": 2000,  "revoke_prob": 0.02, "tamper_after": False, "seed": 2},
  {"n_patients": 500, "n_runs": 5000,  "revoke_prob": 0.05, "tamper_after": False, "seed": 3},

  # Consent stress
  {"n_patients": 200, "n_runs": 2000,  "revoke_prob": 0.20, "tamper_after": False, "seed": 5},
  {"n_patients": 500, "n_runs": 5000,  "revoke_prob": 0.30, "tamper_after": False, "seed": 6},

  # Performance stress (scale)
  {"n_patients": 1000,"n_runs": 20000, "revoke_prob": 0.05, "tamper_after": False, "seed": 7},

  # Integrity stress (repeat tamper, multiple seeds)
  {"n_patients": 200, "n_runs": 2000,  "revoke_prob": 0.02, "tamper_after": True,  "seed": 4},
  {"n_patients": 200, "n_runs": 2000,  "revoke_prob": 0.02, "tamper_after": True,  "seed": 8},
  {"n_patients": 500, "n_runs": 5000,  "revoke_prob": 0.05, "tamper_after": True,  "seed": 9},
]
results = []
for s in scenarios:
    # fresh ledger each scenario (important)
    ledger_path = f"{ARCHIVE_DIR}/ledger/scenario_seed_{s['seed']}.jsonl"
    if os.path.exists(ledger_path):
        os.remove(ledger_path)

    ledger = LocalLedger(ledger_path)
    overlay = GovernanceOverlay(ledger)

    res = run_scenario(overlay, **s)
    res["scenario"] = str(s)
    res["ledger_path"] = ledger_path  # helpful for traceability
    results.append(res)
results

[{'executed': 321,
  'blocked': 179,
  'attempted_unauthorized': 115,
  'blocked_unauthorized': 115,
  'post_revoke_exec': 0,
  'events_expected': 500,
  'events_actual': 550,
  'governance_time_ms': 626.4125380018868,
  'ledger_integrity_ok_before': True,
  'ledger_integrity_err_before': None,
  'ledger_integrity_ok_after': True,
  'ledger_integrity_err_after': None,
  'unauthorized_block_rate': 1.0,
  'avg_governance_ms_per_run': 1.2528250760037736,
  'ledger_entries': 550,
  'scenario': "{'n_patients': 50, 'n_runs': 500, 'revoke_prob': 0.0, 'tamper_after': False, 'seed': 1}",
  'ledger_path': 'results_20251219_225234/ledger/scenario_seed_1.jsonl'},
 {'executed': 1140,
  'blocked': 860,
  'attempted_unauthorized': 411,
  'blocked_unauthorized': 411,
  'post_revoke_exec': 0,
  'events_expected': 2000,
  'events_actual': 2236,
  'governance_time_ms': 8774.79608199849,
  'ledger_integrity_ok_before': True,
  'ledger_integrity_err_before': None,
  'ledger_integrity_ok_after': True,
  'le

In [15]:
# results = run_scenario(...)

import json

with open(f'{ARCHIVE_DIR}/scenario_metrics.json', "w") as f:
    json.dump(results, f, indent=2)

print("Saved scenario_metrics.json")

Saved scenario_metrics.json


In [16]:
import random

def batch_offledger_tamper_test(
    overlay,
    sample_size=200,
    tamper_fraction=0.25,
    seed=1
):
    """
    Test off-ledger manifest integrity detection at scale.
    
    This function validates the two-tier security model:
    1. Ledger is tamper-evident (via hash chain)
    2. Manifests are tamper-evident (via hash comparison with ledger)
    
    Process:
    1. Sample random inference events from ledger
    2. Verify all manifests (baseline - should all pass)
    3. Tamper with fraction of manifests (simulate attack/corruption)
    4. Re-verify all manifests
    5. Count: detected tampering, missed tampering, false alarms
    
    Good governance system should:
    - Detect all tampering (high detection rate)
    - No false alarms (specificity = 100%)
    - Fast verification (scalable)
    
    Args:
        overlay: GovernanceOverlay instance with populated ledger
        sample_size: How many events to test (max)
        tamper_fraction: Proportion of manifests to corrupt (0.0 to 1.0)
        seed: Random seed for reproducibility
        
    Returns:
        Dictionary with:
        - Total inference events available
        - Sample size tested
        - Number tampered with
        - Baseline verification (before tampering)
        - Detection statistics (detected, missed, false alarms)
        - Detection rate (should be ~100%)
        - False alarm rate (should be ~0%)
    """
    random.seed(seed)

    ledger_path = overlay.ledger.ledger_path
    manifest_dir = overlay.manifest_store_dir

    events = list_inference_events(ledger_path)
    if len(events) == 0:
        raise ValueError("No inference events found")

    sample = random.sample(events, k=min(sample_size, len(events)))
    tamper_n = int(len(sample) * tamper_fraction)

    to_tamper = set(e["event_id"] for e in random.sample(sample, k=tamper_n))

    baseline_ok = 0
    for e in sample:
        res = verify_repro_manifest_against_ledger(
            ledger_path, manifest_dir, e["event_id"]
        )
        baseline_ok += int(res["ok"])

    detected = 0
    missed = 0
    false_alarm = 0

    for e in sample:
        eid = e["event_id"]
        if eid in to_tamper:
            tamper_repro_manifest(manifest_dir, eid)

        res = verify_repro_manifest_against_ledger(
            ledger_path, manifest_dir, eid
        )

        if eid in to_tamper:
            if res["ok"] is False:
                detected += 1
            else:
                missed += 1
        else:
            if res["ok"] is False:
                false_alarm += 1

    return {
        "total_inference_events": len(events),
        "sample_size": len(sample),
        "tampered": tamper_n,
        "baseline_ok_before": baseline_ok,
        "detected_tamper": detected,
        "missed_tamper": missed,
        "false_alarms": false_alarm,
        "tamper_detection_rate": detected / max(1, tamper_n),
        "false_alarm_rate": false_alarm / max(1, (len(sample) - tamper_n)),
    }

In [17]:
"""
Quick test of manifest verification (single event).

Demonstrates:
1. List all inference events from ledger
2. Pick one event
3. Verify its manifest (should pass initially)
4. Tamper with manifest
5. Re-verify (should now fail)

This is a sanity check that verification logic works correctly.
"""

ledger_path = overlay.ledger.ledger_path
manifest_dir = overlay.manifest_store_dir

events = list_inference_events(ledger_path)
print("Number of inference events:", len(events))

# Pick one event
event_id = events[0]["event_id"]

# Verify before tampering
print("Before tampering:")
print(verify_repro_manifest_against_ledger(ledger_path, manifest_dir, event_id))

# Tamper
tamper_repro_manifest(manifest_dir, event_id)

# Verify after tampering
print("After tampering:")
print(verify_repro_manifest_against_ledger(ledger_path, manifest_dir, event_id))

Number of inference events: 2285
Before tampering:
{'ok': True, 'event_id': '3b85e42e-8f77-4dfb-9a95-c29a243667e1', 'ledger_repro_hash': '5714572298645178e7e7e5e8033ee6fe277bc6f85f0f2928527fc22443e731e1', 'local_repro_hash': '5714572298645178e7e7e5e8033ee6fe277bc6f85f0f2928527fc22443e731e1', 'manifest_path': 'results_20251219_225234/manifest_store/repro_3b85e42e-8f77-4dfb-9a95-c29a243667e1.json'}
After tampering:
{'ok': False, 'event_id': '3b85e42e-8f77-4dfb-9a95-c29a243667e1', 'ledger_repro_hash': '5714572298645178e7e7e5e8033ee6fe277bc6f85f0f2928527fc22443e731e1', 'local_repro_hash': '0ff082f88cf1d0908d7dd0b9d8cfff16e81ab7a32743c2bb1decb18bd54001fc', 'manifest_path': 'results_20251219_225234/manifest_store/repro_3b85e42e-8f77-4dfb-9a95-c29a243667e1.json'}


- In off-ledger tampering experiments, modifying the stored reproducibility manifest caused the locally computed hash to diverge from the immutable ledger commitment, resulting in deterministic detection of artifact corruption while leaving ledger integrity unaffected.


In [18]:
"""
Execute batch tampering test.

Runs comprehensive test of off-ledger manifest integrity:
- Tests 200 manifests
- Tampers with 25% of them
- Should detect nearly all tampering
- Should have no false alarms

Results show effectiveness of hash-based verification.
"""

result = batch_offledger_tamper_test(
    overlay,
    sample_size=200,
    tamper_fraction=0.25,
    seed=42
)

result

{'total_inference_events': 2285,
 'sample_size': 200,
 'tampered': 50,
 'baseline_ok_before': 200,
 'detected_tamper': 50,
 'missed_tamper': 0,
 'false_alarms': 0,
 'tamper_detection_rate': 1.0,
 'false_alarm_rate': 0.0}

In [19]:
"""
Save tamper detection metrics to JSON file.

Preserves test results in archived directory for:
- Documentation (prove governance system works)
- Compliance (show validation testing)
- Analysis (track metrics over time)
"""

with open(f'{ARCHIVE_DIR}/tamper_metrics.json', "w") as f:
    json.dump(result, f, indent=2)

print("Saved tamper_metrics.json")

Saved tamper_metrics.json


In [None]:
"""
Final integrity check of ledger.

Verifies that the hash chain is intact.
Should print: (True, None) if no tampering occurred.
"""

# for s in scenarios:
#     ledger = LocalLedger(ledger_path)
#     overlay = GovernanceOverlay(ledger)
#     res = run_scenario(overlay, **s)
#     print(res)

for s in scenarios:
    ledger_path = f"{ARCHIVE_DIR}/ledger/scenario_seed_{s['seed']}.jsonl"

    if os.path.exists(ledger_path):
        os.remove(ledger_path)

    ledger = LocalLedger(ledger_path)
    overlay = GovernanceOverlay(ledger)

    res = run_scenario(overlay, **s)
    print(res)


{'executed': 321, 'blocked': 179, 'attempted_unauthorized': 115, 'blocked_unauthorized': 115, 'post_revoke_exec': 0, 'events_expected': 500, 'events_actual': 550, 'governance_time_ms': 608.4007529993869, 'ledger_integrity_ok_before': True, 'ledger_integrity_err_before': None, 'ledger_integrity_ok_after': True, 'ledger_integrity_err_after': None, 'unauthorized_block_rate': 1.0, 'avg_governance_ms_per_run': 1.2168015059987738, 'ledger_entries': 550}
{'executed': 1140, 'blocked': 860, 'attempted_unauthorized': 411, 'blocked_unauthorized': 411, 'post_revoke_exec': 0, 'events_expected': 2000, 'events_actual': 2236, 'governance_time_ms': 8917.088181003237, 'ledger_integrity_ok_before': True, 'ledger_integrity_err_before': None, 'ledger_integrity_ok_after': True, 'ledger_integrity_err_after': None, 'unauthorized_block_rate': 1.0, 'avg_governance_ms_per_run': 4.4585440905016185, 'ledger_entries': 2236}
{'executed': 2383, 'blocked': 2617, 'attempted_unauthorized': 1097, 'blocked_unauthorized': 

In [21]:
from pprint import pprint

print("=== Ledger Integrity Validation (All Scenarios) ===")

for res in results:
    ledger_path = res["ledger_path"]
    ledger = LocalLedger(ledger_path)

    ok, err = ledger.verify_integrity()

    print(f"\nScenario: {res['scenario']}")
    print(f"Ledger path: {ledger_path}")
    print("Integrity OK?", ok)
    if err:
        print("Error:", err)

=== Ledger Integrity Validation (All Scenarios) ===

Scenario: {'n_patients': 50, 'n_runs': 500, 'revoke_prob': 0.0, 'tamper_after': False, 'seed': 1}
Ledger path: results_20251219_225234/ledger/scenario_seed_1.jsonl
Integrity OK? True

Scenario: {'n_patients': 200, 'n_runs': 2000, 'revoke_prob': 0.02, 'tamper_after': False, 'seed': 2}
Ledger path: results_20251219_225234/ledger/scenario_seed_2.jsonl
Integrity OK? True

Scenario: {'n_patients': 500, 'n_runs': 5000, 'revoke_prob': 0.05, 'tamper_after': False, 'seed': 3}
Ledger path: results_20251219_225234/ledger/scenario_seed_3.jsonl
Integrity OK? True

Scenario: {'n_patients': 200, 'n_runs': 2000, 'revoke_prob': 0.2, 'tamper_after': False, 'seed': 5}
Ledger path: results_20251219_225234/ledger/scenario_seed_5.jsonl
Integrity OK? True

Scenario: {'n_patients': 500, 'n_runs': 5000, 'revoke_prob': 0.3, 'tamper_after': False, 'seed': 6}
Ledger path: results_20251219_225234/ledger/scenario_seed_6.jsonl
Integrity OK? True

Scenario: {'n_pat