# Example 3: EU AI Act Compliance Framework

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/Javihaus/agents_observability_bootcamp/blob/main/chapter_04_production_hybrid_systems/examples/example_03_eu_compliance_audit.ipynb)

**Instructor demonstration** - Students follow along

---

## Objective

Demonstrate EU AI Act compliant audit logging and transparency features.

**Key lesson**: Compliance is engineering, not just legal. Build it into your system architecture.

## Setup

In [None]:
from datetime import datetime
import hashlib
import uuid
import json
from collections import defaultdict

print("Setup complete!")

## EU AI Act Compliant Logger

In [None]:
class EUAIActLogger:
    """EU AI Act Articles 13, 15, 17 compliant audit logger"""
    
    def __init__(self, system_name, system_version):
        self.system_name = system_name
        self.system_version = system_version
        self.audit_log = []
        self.risk_events = []
        
    def log_decision(self, decision_data):
        """
        Log AI decision (Article 15: Logging and traceability)
        
        Required fields:
        - Unique decision ID
        - Timestamp
        - Input/output (hashed for privacy)
        - Processing method
        - Confidence score
        - Explanation (Article 13: Transparency)
        """
        decision_id = str(uuid.uuid4())
        
        # Hash sensitive data (GDPR compatibility)
        input_hash = hashlib.sha256(
            str(decision_data.get('input', '')).encode()
        ).hexdigest()[:16]
        
        output_hash = hashlib.sha256(
            str(decision_data.get('output', '')).encode()
        ).hexdigest()[:16]
        
        log_entry = {
            # Article 15: Traceability
            'decision_id': decision_id,
            'timestamp': datetime.now().isoformat(),
            'system_name': self.system_name,
            'system_version': self.system_version,
            
            # Decision details (privacy-preserving)
            'input_hash': input_hash,
            'output_hash': output_hash,
            'processing_method': decision_data.get('processing_method', 'unknown'),
            'model_used': decision_data.get('model', 'unknown'),
            
            # Article 13: Transparency
            'confidence': decision_data.get('confidence', 0.0),
            'explanation': decision_data.get('explanation', ''),
            'ai_system_used': True,
            
            # Validation results
            'validation_passed': decision_data.get('validation_passed', False),
            'validation_checks': decision_data.get('validation_checks', []),
            
            # User context
            'user_id_hash': hashlib.sha256(
                str(decision_data.get('user_id', '')).encode()
            ).hexdigest()[:16],
            'session_id': decision_data.get('session_id', '')
        }
        
        self.audit_log.append(log_entry)
        return decision_id
    
    def log_risk_event(self, event_data):
        """
        Log risk event (Article 17: Quality management)
        
        Risk events include:
        - System errors
        - Unexpected behavior
        - Performance degradation
        - Compliance violations
        """
        event = {
            'event_id': str(uuid.uuid4()),
            'timestamp': datetime.now().isoformat(),
            'severity': event_data.get('severity', 'medium'),
            'category': event_data.get('category', 'unknown'),
            'description': event_data.get('description', ''),
            'affected_decisions': event_data.get('affected_decisions', []),
            'mitigation_action': event_data.get('mitigation_action', '')
        }
        
        self.risk_events.append(event)
        return event['event_id']
    
    def get_decision_trail(self, decision_id):
        """Retrieve complete audit trail for specific decision (Article 15)"""
        for entry in self.audit_log:
            if entry['decision_id'] == decision_id:
                return entry
        return None
    
    def generate_compliance_report(self):
        """
        Generate compliance report for regulatory review
        (Articles 13, 15, 17)
        """
        total_decisions = len(self.audit_log)
        
        # Analyze processing methods
        methods = defaultdict(int)
        for entry in self.audit_log:
            methods[entry['processing_method']] += 1
        
        # Analyze validation rates
        validated = sum(1 for e in self.audit_log if e['validation_passed'])
        
        # Analyze confidence scores
        confidences = [e['confidence'] for e in self.audit_log]
        avg_confidence = sum(confidences) / len(confidences) if confidences else 0
        
        report = {
            'report_date': datetime.now().isoformat(),
            'system_name': self.system_name,
            'system_version': self.system_version,
            
            # Article 15: Logging statistics
            'total_decisions_logged': total_decisions,
            'decisions_by_method': dict(methods),
            'validation_pass_rate': validated / total_decisions if total_decisions > 0 else 0,
            'average_confidence': avg_confidence,
            
            # Article 13: Transparency compliance
            'explanations_provided': sum(1 for e in self.audit_log if e['explanation']),
            'explanation_rate': sum(1 for e in self.audit_log if e['explanation']) / total_decisions if total_decisions > 0 else 0,
            
            # Article 17: Risk management
            'total_risk_events': len(self.risk_events),
            'risk_events_by_severity': self._count_by_field(self.risk_events, 'severity'),
            
            # Retention compliance
            'retention_period': '7 years',
            'data_encryption': 'SHA-256 hashing applied',
            'gdpr_compatible': True
        }
        
        return report
    
    def _count_by_field(self, items, field):
        """Helper to count items by field value"""
        counts = defaultdict(int)
        for item in items:
            counts[item.get(field, 'unknown')] += 1
        return dict(counts)
    
    def export_audit_log(self, filepath='audit_log.json'):
        """Export audit log (Article 15: Accessibility to regulators)"""
        export_data = {
            'export_date': datetime.now().isoformat(),
            'system_name': self.system_name,
            'system_version': self.system_version,
            'total_entries': len(self.audit_log),
            'audit_log': self.audit_log,
            'risk_events': self.risk_events
        }
        
        with open(filepath, 'w') as f:
            json.dump(export_data, f, indent=2)
        
        return filepath

print("EUAIActLogger defined")

## Demonstration: Compliance in Action

In [None]:
# Initialize compliant logger
logger = EUAIActLogger(
    system_name="Hybrid Content Moderation System",
    system_version="v1.0.0"
)

print("EU AI Act Compliant Logger initialized\n")

# Simulate decisions
decisions = [
    {
        'input': 'Is this content appropriate?',
        'output': 'safe',
        'processing_method': 'deterministic',
        'model': 'rule-based',
        'confidence': 0.95,
        'explanation': 'Content matches safe patterns with high confidence',
        'validation_passed': True,
        'validation_checks': ['keyword_check', 'length_check'],
        'user_id': 'user_123',
        'session_id': 'session_abc'
    },
    {
        'input': 'Complex borderline content',
        'output': 'safe_with_warning',
        'processing_method': 'llm',
        'model': 'claude-sonnet-4',
        'confidence': 0.72,
        'explanation': 'Content requires nuanced interpretation but deemed acceptable',
        'validation_passed': True,
        'validation_checks': ['llm_validation', 'human_review_flag'],
        'user_id': 'user_456',
        'session_id': 'session_def'
    },
    {
        'input': 'Obvious spam content',
        'output': 'spam',
        'processing_method': 'deterministic',
        'model': 'rule-based',
        'confidence': 0.98,
        'explanation': 'Multiple spam indicators detected',
        'validation_passed': True,
        'validation_checks': ['spam_filter', 'keyword_check'],
        'user_id': 'user_789',
        'session_id': 'session_ghi'
    }
]

print("Logging decisions...\n")
for i, decision in enumerate(decisions, 1):
    decision_id = logger.log_decision(decision)
    print(f"Decision {i} logged: {decision_id}")
    print(f"  Method: {decision['processing_method']}")
    print(f"  Output: {decision['output']}")
    print(f"  Confidence: {decision['confidence']:.2f}")
    print()

## Log Risk Events

In [None]:
# Simulate risk events
risk_events = [
    {
        'severity': 'low',
        'category': 'performance',
        'description': 'LLM latency exceeded 3s threshold',
        'affected_decisions': [],
        'mitigation_action': 'Monitoring continued, no action required'
    },
    {
        'severity': 'medium',
        'category': 'accuracy',
        'description': 'Deterministic rule disagreement rate increased to 8%',
        'affected_decisions': [],
        'mitigation_action': 'Rule review scheduled, LLM fallback activated'
    }
]

print("Logging risk events...\n")
for event in risk_events:
    event_id = logger.log_risk_event(event)
    print(f"Risk event logged: {event_id}")
    print(f"  Severity: {event['severity']}")
    print(f"  Category: {event['category']}")
    print(f"  Description: {event['description']}")
    print()

## Generate Compliance Report

In [None]:
print("=" * 80)
print("EU AI ACT COMPLIANCE REPORT")
print("=" * 80)
print()

report = logger.generate_compliance_report()
print(json.dumps(report, indent=2))

print("\n" + "=" * 80)
print("COMPLIANCE CHECKLIST")
print("=" * 80)
print()

# Article 13: Transparency
print("Article 13 - Transparency:")
print(f"  ✓ AI system usage disclosed: {report['total_decisions_logged']} decisions")
print(f"  ✓ Explanations provided: {report['explanation_rate']:.1%} of decisions")
print(f"  ✓ Confidence scores recorded: {report['average_confidence']:.2f} average")
print()

# Article 15: Logging
print("Article 15 - Logging and Traceability:")
print(f"  ✓ All decisions logged: {report['total_decisions_logged']} entries")
print(f"  ✓ Unique decision IDs: Generated for each decision")
print(f"  ✓ Timestamps recorded: ISO 8601 format")
print(f"  ✓ Processing methods tracked: {list(report['decisions_by_method'].keys())}")
print(f"  ✓ Retention period: {report['retention_period']}")
print()

# Article 17: Quality Management
print("Article 17 - Quality Management:")
print(f"  ✓ Risk events monitored: {report['total_risk_events']} events logged")
print(f"  ✓ Validation tracking: {report['validation_pass_rate']:.1%} pass rate")
print(f"  ✓ Incident reporting: System operational")
print()

# GDPR Compatibility
print("GDPR Compatibility:")
print(f"  ✓ PII protection: {report['data_encryption']}")
print(f"  ✓ Privacy-preserving: Input/output hashed")
print(f"  ✓ GDPR compatible: {report['gdpr_compatible']}")
print()

print("=" * 80)

## Retrieve Specific Decision Trail

In [None]:
# Example: Regulatory inquiry for specific decision
if logger.audit_log:
    sample_decision_id = logger.audit_log[0]['decision_id']
    
    print(f"Retrieving audit trail for decision: {sample_decision_id}\n")
    
    trail = logger.get_decision_trail(sample_decision_id)
    
    if trail:
        print("Complete Audit Trail:")
        print(json.dumps(trail, indent=2))
    else:
        print("Decision not found")

## Key Takeaways

### EU AI Act Compliance

**Article 13 (Transparency)**:
- ✓ Users informed when AI is used
- ✓ Explanations provided for decisions
- ✓ Confidence scores disclosed

**Article 15 (Logging)**:
- ✓ All decisions automatically logged
- ✓ Unique IDs enable traceability
- ✓ Immutable audit trail
- ✓ 7-year retention ready

**Article 17 (Quality Management)**:
- ✓ Risk events tracked
- ✓ Incident reporting system
- ✓ Performance monitoring

### GDPR Compatibility

- Privacy-preserving hashing
- Sensitive data encrypted
- Audit trail without PII exposure

### Production Deployment

**Before launch**:
- Legal review of compliance implementation
- Encryption key management
- Backup and retention systems
- Regulator access procedures

**After launch**:
- Monitor log storage growth
- Regular compliance audits
- Incident response testing
- Documentation updates

### Remember

Compliance is not optional. Build it into your architecture from the start, not as an afterthought.