# Chapter 5: User Management
## Balancing Security and Usability

### Course: TCPRG4005 - Secure Programming

---

## Learning Objectives

1. **Design** secure user creation and management systems
2. **Implement** role-based access control (RBAC)
3. **Understand** separation of duties principles
4. **Build** audit trails for user actions
5. **Avoid** common user management pitfalls

---

## Key Challenge

> **Balance security with usability**

Users will circumvent security if it's too restrictive, often creating worse vulnerabilities than the original problem.

In [None]:
# Setup for user management examples
import json
import time
from datetime import datetime, timedelta
from enum import Enum

print("👥 Chapter 5: User Management")
print("Building secure and usable user systems")
print("=" * 45)

# Example 1: User Lifecycle Management

Users need to be created, activated, deactivated, and eventually archived. Let's build a secure system.

In [None]:
# User lifecycle management system
class UserStatus(Enum):
    PENDING = "pending"
    ACTIVE = "active"
    LOCKED = "locked"
    DISABLED = "disabled"
    ARCHIVED = "archived"

class User:
    def __init__(self, username, email, created_by):
        self.username = username
        self.email = email
        self.status = UserStatus.PENDING
        self.roles = set()
        self.created_at = datetime.now()
        self.created_by = created_by
        self.last_login = None
        self.failed_logins = 0
        self.audit_log = []
        
    def log_action(self, action, performed_by, details=""):
        """Log all user management actions"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'action': action,
            'performed_by': performed_by,
            'details': details
        }
        self.audit_log.append(log_entry)
    
    def activate(self, admin_user):
        """Activate a pending user"""
        if self.status != UserStatus.PENDING:
            raise ValueError(f"Cannot activate user in {self.status.value} status")
        
        self.status = UserStatus.ACTIVE
        self.log_action("USER_ACTIVATED", admin_user, "User account activated")
        
    def lock_account(self, reason, performed_by):
        """Lock account (temporarily)"""
        if self.status == UserStatus.ACTIVE:
            self.status = UserStatus.LOCKED
            self.log_action("ACCOUNT_LOCKED", performed_by, reason)
        
    def disable_account(self, reason, performed_by):
        """Disable account (remove all roles)"""
        self.status = UserStatus.DISABLED
        old_roles = self.roles.copy()
        self.roles.clear()
        self.log_action("ACCOUNT_DISABLED", performed_by, 
                       f"Roles removed: {old_roles}. Reason: {reason}")

# Demonstrate user lifecycle
print("User Lifecycle Management")
print("-" * 25)

# Create new user
user = User("jsmith", "john.smith@company.com", "admin")
print(f"✅ User created: {user.username} (Status: {user.status.value})")

# Activate user
user.activate("admin")
print(f"✅ User activated: Status now {user.status.value}")

# Add some roles
user.roles.add("employee")
user.roles.add("timesheet_user")
user.log_action("ROLES_ASSIGNED", "admin", "Initial role assignment")

# Lock account due to suspicious activity
user.lock_account("Multiple failed login attempts", "security_system")
print(f"⚠️  Account locked: Status now {user.status.value}")

# Show audit trail
print(f"\n📋 Audit Trail for {user.username}:")
for entry in user.audit_log:
    print(f"  {entry['timestamp'][:19]} | {entry['action']} by {entry['performed_by']}")
    if entry['details']:
        print(f"    Details: {entry['details']}")

# Example 2: Role-Based Access Control (RBAC)

Instead of managing permissions per user, group permissions into roles for easier management.

In [None]:
# Role-Based Access Control system
class Permission:
    def __init__(self, name, description):
        self.name = name
        self.description = description
    
    def __str__(self):
        return self.name
    
    def __hash__(self):
        return hash(self.name)
    
    def __eq__(self, other):
        return self.name == other.name

class Role:
    def __init__(self, name, description):
        self.name = name
        self.description = description
        self.permissions = set()
    
    def add_permission(self, permission):
        self.permissions.add(permission)
    
    def has_permission(self, permission_name):
        return any(p.name == permission_name for p in self.permissions)

class RBACSystem:
    def __init__(self):
        self.users = {}
        self.roles = {}
        self.permissions = {}
    
    def create_permission(self, name, description):
        """Create a new permission"""
        permission = Permission(name, description)
        self.permissions[name] = permission
        return permission
    
    def create_role(self, name, description):
        """Create a new role"""
        role = Role(name, description)
        self.roles[name] = role
        return role
    
    def assign_role_to_user(self, username, role_name, assigned_by):
        """Assign role to user with audit trail"""
        if username in self.users and role_name in self.roles:
            user = self.users[username]
            user.roles.add(role_name)
            user.log_action("ROLE_ASSIGNED", assigned_by, f"Role: {role_name}")
            return True
        return False
    
    def check_permission(self, username, permission_name):
        """Check if user has specific permission"""
        if username not in self.users:
            return False
        
        user = self.users[username]
        for role_name in user.roles:
            if role_name in self.roles:
                role = self.roles[role_name]
                if role.has_permission(permission_name):
                    return True
        return False

# Set up RBAC system
rbac = RBACSystem()

# Create permissions
perms = [
    rbac.create_permission("read_reports", "View financial reports"),
    rbac.create_permission("create_reports", "Create new reports"),
    rbac.create_permission("delete_reports", "Delete reports"),
    rbac.create_permission("manage_users", "Create/modify users"),
    rbac.create_permission("view_audit_logs", "Access audit logs")
]

# Create roles
analyst_role = rbac.create_role("analyst", "Financial analyst")
analyst_role.add_permission(perms[0])  # read_reports
analyst_role.add_permission(perms[1])  # create_reports

manager_role = rbac.create_role("manager", "Department manager")
manager_role.add_permission(perms[0])  # read_reports
manager_role.add_permission(perms[1])  # create_reports
manager_role.add_permission(perms[2])  # delete_reports

admin_role = rbac.create_role("admin", "System administrator")
for perm in perms:
    admin_role.add_permission(perm)

# Add our user to the system
rbac.users["jsmith"] = user

print("\nRole-Based Access Control")
print("-" * 30)

# Test permissions before role assignment
print(f"Can jsmith read reports? {rbac.check_permission('jsmith', 'read_reports')}")

# Assign analyst role
rbac.assign_role_to_user("jsmith", "analyst", "admin")
print(f"✅ Assigned analyst role to jsmith")

# Test permissions after role assignment
print(f"Can jsmith read reports? {rbac.check_permission('jsmith', 'read_reports')}")
print(f"Can jsmith delete reports? {rbac.check_permission('jsmith', 'delete_reports')}")
print(f"Can jsmith manage users? {rbac.check_permission('jsmith', 'manage_users')}")

print(f"\njsmith's current roles: {user.roles}")

# Example 3: Separation of Duties

Critical operations should require multiple people to prevent fraud and accidents.

In [None]:
# Separation of duties implementation
class WorkflowStep:
    def __init__(self, step_name, required_role, description):
        self.step_name = step_name
        self.required_role = required_role
        self.description = description
        self.completed = False
        self.completed_by = None
        self.completed_at = None
    
    def complete(self, user, rbac_system):
        """Complete this workflow step"""
        if not rbac_system.check_permission(user.username, self.required_role):
            raise PermissionError(f"User lacks required role: {self.required_role}")
        
        self.completed = True
        self.completed_by = user.username
        self.completed_at = datetime.now()

class SeparationWorkflow:
    def __init__(self, name, description):
        self.name = name
        self.description = description
        self.steps = []
        self.created_at = datetime.now()
    
    def add_step(self, step):
        """Add a step to the workflow"""
        self.steps.append(step)
    
    def can_complete_step(self, step_index, user, rbac_system):
        """Check if user can complete the specified step"""
        if step_index >= len(self.steps):
            return False
        
        step = self.steps[step_index]
        
        # Check if step is already completed
        if step.completed:
            return False
        
        # Check if user has required permission
        if not rbac_system.check_permission(user.username, step.required_role):
            return False
        
        # Check separation of duties - same user can't complete consecutive steps
        if step_index > 0:
            previous_step = self.steps[step_index - 1]
            if previous_step.completed_by == user.username:
                return False
        
        return True
    
    def complete_step(self, step_index, user, rbac_system):
        """Complete a workflow step"""
        if not self.can_complete_step(step_index, user, rbac_system):
            raise PermissionError("Cannot complete this step")
        
        self.steps[step_index].complete(user, rbac_system)
    
    def is_complete(self):
        """Check if all steps are completed"""
        return all(step.completed for step in self.steps)

# Create a financial transfer workflow
transfer_workflow = SeparationWorkflow(
    "High Value Transfer", 
    "Transfer over $10,000 requires approval from two different roles"
)

# Add steps that require different roles
transfer_workflow.add_step(WorkflowStep("initiate", "create_reports", "Initiate transfer request"))
transfer_workflow.add_step(WorkflowStep("approve", "delete_reports", "Approve transfer"))
transfer_workflow.add_step(WorkflowStep("execute", "manage_users", "Execute transfer"))

# Create a second user with manager role
manager_user = User("mwilson", "mary.wilson@company.com", "admin")
manager_user.status = UserStatus.ACTIVE
rbac.users["mwilson"] = manager_user
rbac.assign_role_to_user("mwilson", "manager", "admin")

# Create admin user
admin_user = User("admin", "admin@company.com", "system")
admin_user.status = UserStatus.ACTIVE
rbac.users["admin"] = admin_user
rbac.assign_role_to_user("admin", "admin", "system")

print("\nSeparation of Duties Workflow")
print("-" * 35)

# Step 1: Analyst initiates transfer
try:
    transfer_workflow.complete_step(0, user, rbac)  # jsmith (analyst)
    print("✅ Step 1 completed by analyst (jsmith)")
except PermissionError as e:
    print(f"❌ Step 1 failed: {e}")

# Step 2: Same user tries to approve (should fail)
try:
    transfer_workflow.complete_step(1, user, rbac)  # jsmith tries again
    print("✅ Step 2 completed by same user")
except PermissionError as e:
    print("❌ Step 2 blocked: Same user cannot complete consecutive steps")

# Step 2: Manager approves (should succeed)
try:
    transfer_workflow.complete_step(1, manager_user, rbac)  # mwilson (manager)
    print("✅ Step 2 completed by manager (mwilson)")
except PermissionError as e:
    print(f"❌ Step 2 failed: {e}")

# Step 3: Admin executes (should succeed)
try:
    transfer_workflow.complete_step(2, admin_user, rbac)  # admin
    print("✅ Step 3 completed by admin")
except PermissionError as e:
    print(f"❌ Step 3 failed: {e}")

print(f"\nWorkflow complete: {transfer_workflow.is_complete()}")

# Show workflow audit trail
print("\n📋 Workflow Audit Trail:")
for i, step in enumerate(transfer_workflow.steps):
    if step.completed:
        print(f"  Step {i+1}: {step.step_name} by {step.completed_by} at {step.completed_at.strftime('%H:%M:%S')}")

# Example 4: Avoiding Group Accounts

Individual accountability requires individual accounts, even if it costs more.

In [None]:
# Group account problems demonstration
class SecurityIncident:
    def __init__(self, description, timestamp=None):
        self.description = description
        self.timestamp = timestamp or datetime.now()
        self.affected_users = []
        self.resolution = None

def analyze_group_account_risks():
    """Demonstrate problems with shared/group accounts"""
    
    incidents = [
        SecurityIncident("Unauthorized data access detected"),
        SecurityIncident("Password leaked on public forum"),
        SecurityIncident("Suspicious file deletion activity"),
        SecurityIncident("Failed audit due to untraceable actions")
    ]
    
    print("Group Account Security Risks")
    print("-" * 30)
    
    print("🔴 Scenario: 5 employees share 1 'analyst' account")
    print("\nProblems with group accounts:")
    
    problems = [
        "Cannot identify which person performed each action",
        "Password must be shared with all 5 people",
        "If one person leaves, password must be changed",
        "If account is compromised, all 5 people are suspects",
        "Audit trails are meaningless without individual identity",
        "Cannot implement proper access controls per person"
    ]
    
    for i, problem in enumerate(problems, 1):
        print(f"  {i}. {problem}")
    
    print(f"\n💰 Cost Analysis:")
    print(f"  Group account cost: $100/month")
    print(f"  Individual accounts: $25/month × 5 = $125/month")
    print(f"  Extra cost: $25/month")
    print(f"  Cost of security incident: $50,000+")
    print(f"  Risk reduction: Invaluable")
    
    print(f"\n✅ Solution: Individual accounts for accountability")
    print(f"  • Each person has their own credentials")
    print(f"  • Actions are traceable to individuals")
    print(f"  • Role-based permissions can be customized")
    print(f"  • Password breaches affect only one account")

analyze_group_account_risks()

# Show individual account benefits
print(f"\n📊 Individual Account Benefits:")
individual_users = ["jsmith", "mwilson", "admin"]
for username in individual_users:
    if username in rbac.users:
        user = rbac.users[username]
        print(f"  {username}: {len(user.audit_log)} logged actions")

# Example 5: User Account Monitoring

Monitor user behavior for security anomalies and policy violations.

In [None]:
# User behavior monitoring system
class UserBehaviorMonitor:
    def __init__(self):
        self.login_patterns = {}
        self.activity_patterns = {}
        self.alerts = []
    
    def record_login(self, username, timestamp=None, ip_address="127.0.0.1"):
        """Record user login for pattern analysis"""
        timestamp = timestamp or datetime.now()
        
        if username not in self.login_patterns:
            self.login_patterns[username] = []
        
        self.login_patterns[username].append({
            'timestamp': timestamp,
            'ip_address': ip_address,
            'hour': timestamp.hour
        })
        
        # Check for anomalies
        self._check_login_anomalies(username)
    
    def _check_login_anomalies(self, username):
        """Check for suspicious login patterns"""
        logins = self.login_patterns[username]
        
        if len(logins) < 2:
            return
        
        # Check for logins from multiple IPs in short time
        recent_logins = [l for l in logins if 
                        (datetime.now() - l['timestamp']).seconds < 3600]  # Last hour
        
        unique_ips = set(l['ip_address'] for l in recent_logins)
        if len(unique_ips) > 2:
            self.alerts.append(f"🚨 {username}: Multiple IP addresses in last hour")
        
        # Check for unusual time patterns
        recent_hours = [l['hour'] for l in logins[-10:]]  # Last 10 logins
        if any(hour < 6 or hour > 22 for hour in recent_hours[-3:]):  # Last 3 logins
            usual_hours = [h for h in recent_hours[:-3]]
            if usual_hours and all(6 <= h <= 22 for h in usual_hours):
                self.alerts.append(f"⚠️  {username}: Unusual login time detected")
    
    def get_user_summary(self, username):
        """Get summary of user behavior"""
        if username not in self.login_patterns:
            return "No login data available"
        
        logins = self.login_patterns[username]
        unique_ips = set(l['ip_address'] for l in logins)
        login_hours = [l['hour'] for l in logins]
        
        return {
            'total_logins': len(logins),
            'unique_ip_addresses': len(unique_ips),
            'common_login_hours': list(set(login_hours)),
            'last_login': logins[-1]['timestamp'].strftime('%Y-%m-%d %H:%M:%S')
        }

# Demonstrate behavior monitoring
monitor = UserBehaviorMonitor()

print("User Behavior Monitoring")
print("-" * 25)

# Simulate normal login pattern for jsmith
base_time = datetime.now().replace(hour=9, minute=0, second=0)
for i in range(5):
    login_time = base_time + timedelta(days=i, hours=i*2)
    monitor.record_login("jsmith", login_time, "192.168.1.100")

print("✅ Recorded normal login pattern for jsmith")

# Simulate suspicious activity
monitor.record_login("jsmith", datetime.now().replace(hour=2), "10.0.0.50")  # Unusual time
monitor.record_login("jsmith", datetime.now(), "203.45.67.89")  # Different IP
monitor.record_login("jsmith", datetime.now(), "93.184.216.34")  # Another IP

print("🔍 Recorded suspicious login attempts")

# Check alerts
print(f"\n🚨 Security Alerts ({len(monitor.alerts)}):")
for alert in monitor.alerts:
    print(f"  {alert}")

# Show user summary
summary = monitor.get_user_summary("jsmith")
print(f"\n📊 User Summary for jsmith:")
for key, value in summary.items():
    print(f"  {key}: {value}")

# Chapter 5 Summary

## Key Principles:

✅ **User Lifecycle Management**:
- Structured creation, activation, deactivation process
- Complete audit trails for accountability
- Lock vs disable vs archive strategies

✅ **Role-Based Access Control**:
- Group permissions into roles for easier management
- Assign roles to users, not individual permissions
- Regular role review and cleanup

✅ **Separation of Duties**:
- Critical operations require multiple approvers
- Prevent single person from completing full workflow
- Different roles for different steps

✅ **Individual Accountability**:
- Avoid shared/group accounts despite cost
- Every action must be traceable to an individual
- Individual accounts enable proper access controls

✅ **Behavior Monitoring**:
- Track login patterns and anomalies
- Alert on suspicious behavior
- Regular review of user activity

## Best Practices:

- **Make security transparent** to users when possible
- **Plan for user departures** and role changes
- **Regular access reviews** and role cleanup
- **Monitor and alert** on policy violations
- **Document all user management** decisions

> **Remember**: If security is too hard to use, users will find ways around it!

In [3]:
# Chapter 5: User Management Fundamentals - Setup and Imports
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Set, Optional, Tuple, Any
from collections import defaultdict
from abc import ABC, abstractmethod
import time
import uuid
import json
import datetime

print("🔧 Setting up Chapter 5: User Management Fundamentals")
print("📚 Loading necessary libraries and classes...")
print("✅ Setup complete - ready for demonstrations!")

🔧 Setting up Chapter 5: User Management Fundamentals
📚 Loading necessary libraries and classes...
✅ Setup complete - ready for demonstrations!


In [4]:
# Secure User Lifecycle Management System
class UserStatus(Enum):
    PENDING_APPROVAL = "pending_approval"
    ACTIVE = "active"
    INACTIVE = "inactive"
    SUSPENDED = "suspended"
    TERMINATED = "terminated"

class ActionType(Enum):
    USER_CREATION_REQUEST = "user_creation_request"
    USER_CREATION_APPROVAL = "user_creation_approval"
    USER_DEACTIVATION = "user_deactivation"
    USER_SUSPENSION = "user_suspension"
    ROLE_ASSIGNMENT = "role_assignment"
    ROLE_REMOVAL = "role_removal"

@dataclass
class AuditLogEntry:
    timestamp: str
    action_type: ActionType
    actor: str
    target_user: str
    details: Dict[str, Any]
    request_id: Optional[str] = None

@dataclass
class UserCreationRequest:
    request_id: str
    requester: str
    user_data: Dict[str, Any]
    justification: str
    timestamp: str
    status: str = "pending"
    approver: Optional[str] = None

class SecureUserLifecycleManager:
    def __init__(self):
        self.users: Dict[str, Dict] = {}
        self.creation_requests: Dict[str, UserCreationRequest] = {}
        self.audit_log: List[AuditLogEntry] = []
        self.approved_roles = ["HR_Manager", "Security_Admin", "Department_Manager"]
        
        # Initialize with some baseline users
        self._initialize_system()
    
    def _initialize_system(self):
        """Initialize system with baseline users"""
        # HR Manager
        self.users['alice.smith'] = {
            'username': 'alice.smith',
            'email': 'alice.smith@company.com',
            'full_name': 'Alice Smith',
            'department': 'Human Resources',
            'status': UserStatus.ACTIVE,
            'roles': ['HR_Manager'],
            'created_date': datetime.datetime.now().isoformat(),
            'last_activity': datetime.datetime.now().isoformat()
        }
        
        # Security Admin
        self.users['bob.jones'] = {
            'username': 'bob.jones',
            'email': 'bob.jones@company.com',
            'full_name': 'Bob Jones',
            'department': 'Information Security',
            'status': UserStatus.ACTIVE,
            'roles': ['Security_Admin'],
            'created_date': datetime.datetime.now().isoformat(),
            'last_activity': datetime.datetime.now().isoformat()
        }
    
    def request_user_creation(self, requester: str, user_data: Dict[str, Any], justification: str) -> str:
        """Request creation of a new user account"""
        if requester not in self.users:
            raise ValueError(f"Requester {requester} not found in system")
        
        requester_roles = self.users[requester]['roles']
        if not any(role in self.approved_roles for role in requester_roles):
            raise ValueError(f"Requester {requester} lacks permission to request user creation")
        
        request_id = str(uuid.uuid4())[:8]
        request = UserCreationRequest(
            request_id=request_id,
            requester=requester,
            user_data=user_data,
            justification=justification,
            timestamp=datetime.datetime.now().isoformat()
        )
        
        self.creation_requests[request_id] = request
        
        # Log the request
        self.audit_log.append(AuditLogEntry(
            timestamp=datetime.datetime.now().isoformat(),
            action_type=ActionType.USER_CREATION_REQUEST,
            actor=requester,
            target_user=user_data['username'],
            details={
                'request_id': request_id,
                'justification': justification,
                'user_data': user_data
            },
            request_id=request_id
        ))
        
        print(f"✅ User creation request submitted (Request ID: {request_id})")
        return request_id
    
    def approve_user_creation(self, approver: str, request_id: str) -> Dict[str, Any]:
        """Approve a user creation request"""
        if approver not in self.users:
            raise ValueError(f"Approver {approver} not found in system")
        
        if request_id not in self.creation_requests:
            raise ValueError(f"Request {request_id} not found")
        
        request = self.creation_requests[request_id]
        
        # Separation of duties: prevent self-approval
        if request.requester == approver:
            raise ValueError("Users cannot approve their own requests (separation of duties)")
        
        approver_roles = self.users[approver]['roles']
        if not any(role in self.approved_roles for role in approver_roles):
            raise ValueError(f"Approver {approver} lacks permission to approve user creation")
        
        # Create the user
        username = request.user_data['username']
        new_user = {
            'username': username,
            'email': request.user_data.get('email'),
            'full_name': request.user_data.get('full_name'),
            'department': request.user_data.get('department'),
            'status': UserStatus.ACTIVE,
            'roles': ['Standard_User'],  # Default role
            'created_date': datetime.datetime.now().isoformat(),
            'last_activity': datetime.datetime.now().isoformat(),
            'created_by': request.requester,
            'approved_by': approver
        }
        
        self.users[username] = new_user
        
        # Update request status
        request.status = "approved"
        request.approver = approver
        
        # Log the approval
        self.audit_log.append(AuditLogEntry(
            timestamp=datetime.datetime.now().isoformat(),
            action_type=ActionType.USER_CREATION_APPROVAL,
            actor=approver,
            target_user=username,
            details={
                'request_id': request_id,
                'original_requester': request.requester
            },
            request_id=request_id
        ))
        
        print(f"✅ User {username} created and activated")
        return new_user
    
    def list_users(self) -> Dict[str, Dict]:
        """List all users in the system"""
        return self.users
    
    def get_audit_summary(self) -> Dict[str, Any]:
        """Get summary of audit activities"""
        event_types = defaultdict(int)
        users_affected = set()
        
        for entry in self.audit_log:
            event_types[entry.action_type.value] += 1
            users_affected.add(entry.target_user)
        
        return {
            'total_events': len(self.audit_log),
            'event_types': dict(event_types),
            'users_affected': len(users_affected)
        }

# Initialize the lifecycle manager
lifecycle_manager = SecureUserLifecycleManager()
print("🔄 SecureUserLifecycleManager initialized with baseline users")

🔄 SecureUserLifecycleManager initialized with baseline users


In [5]:
# Demonstration: User Creation Workflow
print(f"\n" + "="*40)
print("DEMO: Secure User Creation Workflow")
print("="*40)

# Step 1: HR Manager requests new user creation
print(f"\n📝 Step 1: HR Manager requests new employee account")
try:
    new_user_data = {
        'username': 'john.doe',
        'email': 'john.doe@company.com',
        'full_name': 'John Doe',
        'department': 'Engineering'
    }
    
    request_id = lifecycle_manager.request_user_creation(
        requester='alice.smith',  # HR Manager
        user_data=new_user_data,
        justification='New engineering hire - starting next Monday'
    )
    
except Exception as e:
    print(f"❌ Error: {e}")

# Step 2: Security Admin approves the request
print(f"\n✅ Step 2: Security Admin approves the request")
try:
    new_user = lifecycle_manager.approve_user_creation(
        approver='bob.jones',  # Security Admin
        request_id=request_id
    )
    
except Exception as e:
    print(f"❌ Error: {e}")

# Step 3: Show the created user
print(f"\n👤 User Creation Summary:")
users = lifecycle_manager.list_users()
if 'john.doe' in users:
    user_info = users['john.doe']
    print(f"   Username: john.doe")
    print(f"   Status: {user_info['status']}")
    print(f"   Roles: {user_info['roles']}")
    print(f"   Department: {user_info['department']}")

# Demonstrate separation of duties - prevent self-approval
print(f"\n" + "="*40)
print("DEMO: Separation of Duties Protection")
print("="*40)

print(f"\n🚫 Attempting self-approval (should fail)...")
try:
    # HR manager tries to approve their own request
    bad_request_id = lifecycle_manager.request_user_creation(
        requester='alice.smith',
        user_data={
            'username': 'jane.doe',
            'email': 'jane.doe@company.com', 
            'full_name': 'Jane Doe',
            'department': 'HR'
        },
        justification='New HR assistant'
    )
    
    # Try to self-approve (this should fail)
    lifecycle_manager.approve_user_creation(
        approver='alice.smith',  # Same person!
        request_id=bad_request_id
    )
    
except Exception as e:
    print(f"✅ Security control worked: {e}")

# Show audit trail
print(f"\n📊 Audit Summary:")
audit_summary = lifecycle_manager.get_audit_summary()
print(f"   Total events: {audit_summary['total_events']}")
print(f"   Event types: {audit_summary['event_types']}")
print(f"   Users affected: {audit_summary['users_affected']}")


DEMO: Secure User Creation Workflow

📝 Step 1: HR Manager requests new employee account
✅ User creation request submitted (Request ID: c0e0bf0f)

✅ Step 2: Security Admin approves the request
✅ User john.doe created and activated

👤 User Creation Summary:
   Username: john.doe
   Status: UserStatus.ACTIVE
   Roles: ['Standard_User']
   Department: Engineering

DEMO: Separation of Duties Protection

🚫 Attempting self-approval (should fail)...
✅ User creation request submitted (Request ID: 798c08bb)
✅ Security control worked: Users cannot approve their own requests (separation of duties)

📊 Audit Summary:
   Total events: 3
   Event types: {'user_creation_request': 2, 'user_creation_approval': 1}
   Users affected: 2


### 1.2 Role-Based Access Control (RBAC) System
Implementing a comprehensive RBAC system with hierarchical roles and dynamic permission management.

In [6]:
# Role-Based Access Control System with Workflow Analysis
from abc import ABC, abstractmethod
from collections import defaultdict
import time

class Permission:
    """Represents a specific permission"""
    def __init__(self, name: str, description: str, category: str = "general"):
        self.name = name
        self.description = description
        self.category = category
    
    def __str__(self):
        return f"{self.name} ({self.category})"
    
    def __hash__(self):
        return hash(self.name)
    
    def __eq__(self, other):
        return isinstance(other, Permission) and self.name == other.name

class Role:
    """Represents a role with permissions and hierarchy"""
    def __init__(self, name: str, description: str, parent_role=None):
        self.name = name
        self.description = description
        self.parent_role = parent_role
        self.permissions: Set[Permission] = set()
        self.child_roles: List['Role'] = []
        
        if parent_role:
            parent_role.child_roles.append(self)
    
    def add_permission(self, permission: Permission):
        """Add permission to this role"""
        self.permissions.add(permission)
    
    def get_all_permissions(self) -> Set[Permission]:
        """Get all permissions including inherited from parent roles"""
        all_permissions = self.permissions.copy()
        
        # Inherit from parent
        if self.parent_role:
            all_permissions.update(self.parent_role.get_all_permissions())
        
        return all_permissions
    
    def has_permission(self, permission: Permission) -> bool:
        """Check if role has specific permission"""
        return permission in self.get_all_permissions()

class WorkflowAnalyzer:
    """
    Analyzes business workflows to identify required roles and permissions
    """
    
    def __init__(self):
        self.workflows = {}
        self.derived_roles = {}
        self.permission_requirements = defaultdict(set)
    
    def define_workflow(self, workflow_name: str, steps: List[Dict]):
        """
        Define a business workflow with steps and permission requirements
        """
        self.workflows[workflow_name] = steps
        
        # Analyze workflow to derive roles
        for step in steps:
            role_name = step.get('role')
            permissions = step.get('permissions', [])
            
            if role_name:
                self.permission_requirements[role_name].update(permissions)
        
        print(f"✅ Workflow '{workflow_name}' defined with {len(steps)} steps")
    
    def get_workflow_roles(self, workflow_name: str) -> Dict[str, Set[str]]:
        """Get roles and their permissions for a specific workflow"""
        if workflow_name not in self.workflows:
            return {}
        
        workflow_roles = defaultdict(set)
        for step in self.workflows[workflow_name]:
            role = step.get('role')
            permissions = step.get('permissions', [])
            if role:
                workflow_roles[role].update(permissions)
        
        return dict(workflow_roles)
    
    def analyze_role_overlap(self) -> Dict:
        """Analyze permission overlap between roles"""
        overlap_analysis = {}
        roles = list(self.permission_requirements.keys())
        
        for i, role1 in enumerate(roles):
            for role2 in roles[i+1:]:
                perms1 = self.permission_requirements[role1]
                perms2 = self.permission_requirements[role2]
                overlap = perms1.intersection(perms2)
                
                if overlap:
                    overlap_analysis[f"{role1}-{role2}"] = {
                        'shared_permissions': list(overlap),
                        'role1_unique': list(perms1 - perms2),
                        'role2_unique': list(perms2 - perms1)
                    }
        
        return overlap_analysis

class ComprehensiveRBACSystem:
    """
    Advanced RBAC system with workflow integration and audit capabilities
    """
    
    def __init__(self):
        self.permissions: Dict[str, Permission] = {}
        self.roles: Dict[str, Role] = {}
        self.user_roles: Dict[str, Set[str]] = defaultdict(set)
        self.workflow_analyzer = WorkflowAnalyzer()
        self.access_log: List[Dict] = []
        
        # Setup initial permissions and roles
        self._setup_base_permissions()
        self._setup_role_hierarchy()
        self._setup_business_workflows()
    
    def _setup_base_permissions(self):
        """Define base system permissions"""
        base_permissions = [
            # User management
            Permission("user.create", "Create new users", "user_management"),
            Permission("user.delete", "Delete users", "user_management"),
            Permission("user.modify", "Modify user details", "user_management"),
            Permission("user.view", "View user information", "user_management"),
            
            # Role management  
            Permission("role.assign", "Assign roles to users", "role_management"),
            Permission("role.create", "Create new roles", "role_management"),
            Permission("role.modify", "Modify existing roles", "role_management"),
            
            # Business operations
            Permission("purchase.request", "Create purchase requests", "business"),
            Permission("purchase.approve_small", "Approve purchases under $1000", "business"),
            Permission("purchase.approve_large", "Approve purchases over $1000", "business"),
            Permission("inventory.view", "View inventory", "business"),
            Permission("inventory.modify", "Modify inventory", "business"),
            
            # Financial operations
            Permission("finance.view_reports", "View financial reports", "finance"),
            Permission("finance.process_payments", "Process payments", "finance"),
            Permission("finance.audit", "Perform financial audits", "finance"),
            
            # System administration
            Permission("system.configure", "Configure system settings", "admin"),
            Permission("system.backup", "Perform system backups", "admin"),
            Permission("system.logs", "View system logs", "admin"),
            
            # Security operations
            Permission("security.audit", "Perform security audits", "security"),
            Permission("security.monitor", "Monitor security events", "security"),
            Permission("security.incident_response", "Respond to security incidents", "security")
        ]
        
        for perm in base_permissions:
            self.permissions[perm.name] = perm
        
        print(f"✅ Initialized {len(base_permissions)} base permissions")
    
    def _setup_role_hierarchy(self):
        """Create hierarchical role structure"""
        # Base employee role
        employee = Role("employee", "Basic employee permissions")
        employee.add_permission(self.permissions["user.view"])
        employee.add_permission(self.permissions["inventory.view"])
        
        # Developer role (inherits from employee)
        developer = Role("developer", "Software developer permissions", employee)
        developer.add_permission(self.permissions["system.logs"])
        
        # Senior developer role (inherits from developer)
        senior_dev = Role("senior_developer", "Senior developer permissions", developer)
        senior_dev.add_permission(self.permissions["system.configure"])
        
        # Manager role (inherits from employee)
        manager = Role("manager", "Management permissions", employee)
        manager.add_permission(self.permissions["purchase.approve_small"])
        manager.add_permission(self.permissions["user.modify"])
        
        # Department head role (inherits from manager)
        dept_head = Role("department_head", "Department head permissions", manager)
        dept_head.add_permission(self.permissions["purchase.approve_large"])
        dept_head.add_permission(self.permissions["role.assign"])
        
        # HR role (inherits from employee)
        hr = Role("hr_specialist", "HR specialist permissions", employee)
        hr.add_permission(self.permissions["user.create"])
        hr.add_permission(self.permissions["user.modify"])
        
        # Security admin role (inherits from employee)
        security_admin = Role("security_admin", "Security administrator permissions", employee)
        security_admin.add_permission(self.permissions["user.create"])
        security_admin.add_permission(self.permissions["user.delete"])
        security_admin.add_permission(self.permissions["role.assign"])
        security_admin.add_permission(self.permissions["security.audit"])
        security_admin.add_permission(self.permissions["security.monitor"])
        security_admin.add_permission(self.permissions["security.incident_response"])
        
        # Finance role (inherits from employee)
        finance = Role("finance_specialist", "Finance specialist permissions", employee)
        finance.add_permission(self.permissions["finance.view_reports"])
        finance.add_permission(self.permissions["finance.process_payments"])
        
        # Store roles
        roles = [employee, developer, senior_dev, manager, dept_head, hr, security_admin, finance]
        for role in roles:
            self.roles[role.name] = role
        
        print(f"✅ Created {len(roles)} roles with hierarchical inheritance")
    
    def _setup_business_workflows(self):
        """Define business workflows to validate role design"""
        # Purchase approval workflow
        purchase_workflow = [
            {
                'step': 'Create Request',
                'role': 'employee',
                'permissions': ['purchase.request']
            },
            {
                'step': 'Manager Review',
                'role': 'manager', 
                'permissions': ['purchase.approve_small']
            },
            {
                'step': 'Department Head Approval',
                'role': 'department_head',
                'permissions': ['purchase.approve_large']
            },
            {
                'step': 'Finance Processing',
                'role': 'finance_specialist',
                'permissions': ['finance.process_payments']
            }
        ]
        
        # User onboarding workflow
        onboarding_workflow = [
            {
                'step': 'HR Request',
                'role': 'hr_specialist',
                'permissions': ['user.create']
            },
            {
                'step': 'Security Approval',
                'role': 'security_admin',
                'permissions': ['user.create', 'role.assign']
            },
            {
                'step': 'Manager Role Assignment',
                'role': 'department_head',
                'permissions': ['role.assign']
            }
        ]
        
        self.workflow_analyzer.define_workflow('purchase_approval', purchase_workflow)
        self.workflow_analyzer.define_workflow('user_onboarding', onboarding_workflow)
        
        print(f"✅ Defined business workflows for role validation")
    
    def assign_role(self, username: str, role_name: str, assigned_by: str):
        """Assign role to user with audit trail"""
        if role_name not in self.roles:
            raise ValueError(f"Role {role_name} does not exist")
        
        # Check if assigner has permission
        if not self.check_permission(assigned_by, "role.assign"):
            raise PermissionError(f"User {assigned_by} cannot assign roles")
        
        self.user_roles[username].add(role_name)
        
        # Audit log
        self.access_log.append({
            'timestamp': datetime.datetime.now().isoformat(),
            'action': 'role_assigned',
            'actor': assigned_by,
            'target_user': username,
            'role': role_name
        })
        
        print(f"✅ Role '{role_name}' assigned to {username} by {assigned_by}")
    
    def check_permission(self, username: str, permission_name: str) -> bool:
        """Check if user has specific permission"""
        if permission_name not in self.permissions:
            return False
        
        permission = self.permissions[permission_name]
        user_permissions = self.get_user_permissions(username)
        
        # Log access check
        has_permission = permission in user_permissions
        self.access_log.append({
            'timestamp': datetime.datetime.now().isoformat(),
            'action': 'permission_check',
            'user': username,
            'permission': permission_name,
            'result': has_permission
        })
        
        return has_permission
    
    def get_user_permissions(self, username: str) -> Set[Permission]:
        """Get all permissions for a user across all roles"""
        all_permissions = set()
        
        for role_name in self.user_roles[username]:
            if role_name in self.roles:
                role = self.roles[role_name]
                all_permissions.update(role.get_all_permissions())
        
        return all_permissions
    
    def analyze_user_privileges(self, username: str) -> Dict:
        """Analyze user's privilege level and potential issues"""
        user_permissions = self.get_user_permissions(username)
        user_role_names = list(self.user_roles[username])
        
        # Count permissions by category
        category_counts = defaultdict(int)
        for perm in user_permissions:
            category_counts[perm.category] += 1
        
        # Check for privilege escalation risks
        high_risk_permissions = [
            "user.delete", "role.create", "role.modify", 
            "system.configure", "security.incident_response"
        ]
        
        risky_permissions = [
            perm.name for perm in user_permissions 
            if perm.name in high_risk_permissions
        ]
        
        return {
            'username': username,
            'roles': user_role_names,
            'total_permissions': len(user_permissions),
            'permissions_by_category': dict(category_counts),
            'high_risk_permissions': risky_permissions,
            'privilege_level': self._calculate_privilege_level(user_permissions)
        }
    
    def _calculate_privilege_level(self, permissions: Set[Permission]) -> str:
        """Calculate overall privilege level"""
        admin_perms = sum(1 for p in permissions if p.category in ['admin', 'security'])
        total_perms = len(permissions)
        
        if admin_perms > 5:
            return "HIGH"
        elif total_perms > 15:
            return "MEDIUM"
        else:
            return "LOW"
    
    def get_role_analysis(self) -> Dict:
        """Analyze role effectiveness and potential issues"""
        role_analysis = {}
        
        for role_name, role in self.roles.items():
            permissions = role.get_all_permissions()
            
            role_analysis[role_name] = {
                'direct_permissions': len(role.permissions),
                'inherited_permissions': len(permissions) - len(role.permissions),
                'total_permissions': len(permissions),
                'parent_role': role.parent_role.name if role.parent_role else None,
                'child_roles': [child.name for child in role.child_roles],
                'users_assigned': sum(1 for user_roles in self.user_roles.values() 
                                    if role_name in user_roles)
            }
        
        return role_analysis

# Demo: Comprehensive RBAC System
print("🔐 ROLE-BASED ACCESS CONTROL DEMONSTRATION")
print("=" * 50)

# Initialize RBAC system
rbac_system = ComprehensiveRBACSystem()

🔐 ROLE-BASED ACCESS CONTROL DEMONSTRATION
✅ Initialized 21 base permissions
✅ Created 8 roles with hierarchical inheritance
✅ Workflow 'purchase_approval' defined with 4 steps
✅ Workflow 'user_onboarding' defined with 3 steps
✅ Defined business workflows for role validation


In [None]:
# Demonstration: Role Assignment and Permission Checking
print(f"\n" + "="*40)
print("DEMO: Role Assignment & Permission Testing")
print("="*40)

# Assign roles to our demo users from lifecycle manager
demo_users = ['alice.smith', 'bob.jones', 'carol.wilson', 'john.doe']

# Assign appropriate roles
role_assignments = [
    ('alice.smith', 'hr_specialist', 'bob.jones'),      # HR Manager gets HR role
    ('bob.jones', 'security_admin', 'bob.jones'),       # Security admin (self-assign for demo)
    ('carol.wilson', 'department_head', 'bob.jones'),   # Department head role
    ('john.doe', 'developer', 'carol.wilson')           # New engineer gets developer role
]

print(f"\n👥 Assigning roles to users:")
for username, role, assigned_by in role_assignments:
    try:
        rbac_system.assign_role(username, role, assigned_by)
    except Exception as e:
        print(f"❌ Failed to assign {role} to {username}: {e}")

# Test permission checks
print(f"\n🔍 Testing Permission Checks:")
permission_tests = [
    ('alice.smith', 'user.create', "HR can create users"),
    ('john.doe', 'user.create', "Developer cannot create users"),
    ('carol.wilson', 'purchase.approve_large', "Dept head can approve large purchases"),
    ('john.doe', 'system.logs', "Developer can view system logs"),
    ('alice.smith', 'security.audit', "HR cannot perform security audits"),
    ('bob.jones', 'security.incident_response', "Security admin can respond to incidents")
]

for username, permission, description in permission_tests:
    has_perm = rbac_system.check_permission(username, permission)
    result = "✅ ALLOWED" if has_perm else "❌ DENIED"
    print(f"   {result}: {description}")

# Analyze user privileges
print(f"\n📊 User Privilege Analysis:")
for username in demo_users:
    analysis = rbac_system.analyze_user_privileges(username)
    print(f"\n👤 {username}:")
    print(f"   Roles: {analysis['roles']}")
    print(f"   Total Permissions: {analysis['total_permissions']}")
    print(f"   Privilege Level: {analysis['privilege_level']}")
    print(f"   Categories: {analysis['permissions_by_category']}")
    if analysis['high_risk_permissions']:
        print(f"   ⚠️  High-Risk Permissions: {analysis['high_risk_permissions']}")

# Workflow analysis
print(f"\n" + "="*40)
print("DEMO: Workflow-Driven Role Validation")
print("="*40)

print(f"\n🔄 Purchase Approval Workflow Analysis:")
purchase_roles = rbac_system.workflow_analyzer.get_workflow_roles('purchase_approval')
for role, permissions in purchase_roles.items():
    print(f"   {role}: {list(permissions)}")

print(f"\n🔄 User Onboarding Workflow Analysis:")
onboarding_roles = rbac_system.workflow_analyzer.get_workflow_roles('user_onboarding')
for role, permissions in onboarding_roles.items():
    print(f"   {role}: {list(permissions)}")

# Role hierarchy demonstration
print(f"\n👑 Role Hierarchy & Inheritance:")
role_analysis = rbac_system.get_role_analysis()
for role_name, analysis in role_analysis.items():
    print(f"\n🎭 {role_name}:")
    print(f"   Direct permissions: {analysis['direct_permissions']}")
    print(f"   Inherited permissions: {analysis['inherited_permissions']}")
    print(f"   Total permissions: {analysis['total_permissions']}")
    print(f"   Parent role: {analysis['parent_role']}")
    print(f"   Users assigned: {analysis['users_assigned']}")
    if analysis['child_roles']:
        print(f"   Child roles: {analysis['child_roles']}")

# Show permission inheritance example
print(f"\n🔗 Permission Inheritance Example:")
if 'developer' in rbac_system.roles:
    dev_role = rbac_system.roles['developer']
    all_perms = dev_role.get_all_permissions()
    direct_perms = dev_role.permissions
    inherited_perms = all_perms - direct_perms
    
    print(f"   Developer Role Analysis:")
    print(f"   Direct permissions: {[p.name for p in direct_perms]}")
    print(f"   Inherited from employee: {[p.name for p in inherited_perms]}")
    print(f"   Total effective permissions: {len(all_perms)}")

print(f"\n✅ RBAC demonstration completed successfully!")

### 1.3 User Behavior Monitoring & Anomaly Detection
Implementing comprehensive monitoring of user activities to detect suspicious patterns and potential security threats.

In [None]:
# User Behavior Monitoring & Anomaly Detection System
import random
import ipaddress
from collections import deque
from statistics import mean, stdev

@dataclass
class UserAction:
    username: str
    action_type: str
    timestamp: datetime.datetime
    source_ip: str
    user_agent: str
    resource: str
    result: str
    session_id: str
    details: Dict = field(default_factory=dict)

@dataclass
class UserBehaviorProfile:
    username: str
    typical_login_hours: Set[int] = field(default_factory=set)
    typical_locations: Set[str] = field(default_factory=set)
    typical_actions: Dict[str, int] = field(default_factory=dict)
    session_duration_avg: float = 0.0
    session_duration_std: float = 0.0
    created_at: datetime.datetime = field(default_factory=datetime.datetime.now)
    last_updated: datetime.datetime = field(default_factory=datetime.datetime.now)

class SecurityAlert:
    def __init__(self, alert_type: str, username: str, severity: str, description: str, details: Dict):
        self.alert_type = alert_type
        self.username = username
        self.severity = severity  # LOW, MEDIUM, HIGH, CRITICAL
        self.description = description
        self.details = details
        self.timestamp = datetime.datetime.now()
        self.id = secrets.token_urlsafe(8)
    
    def __str__(self):
        return f"[{self.severity}] {self.alert_type}: {self.description}"

class UserBehaviorMonitor:
    """
    Comprehensive user behavior monitoring and anomaly detection system
    """
    
    def __init__(self):
        self.user_profiles: Dict[str, UserBehaviorProfile] = {}
        self.action_history: deque = deque(maxlen=10000)  # Keep last 10k actions
        self.active_sessions: Dict[str, Dict] = {}
        self.security_alerts: List[SecurityAlert] = []
        self.ip_reputation: Dict[str, str] = {}  # IP -> reputation (clean/suspicious/malicious)
        
        # Configuration thresholds
        self.config = {
            'max_failed_logins': 5,
            'session_timeout_hours': 8,
            'off_hours_start': 22,  # 10 PM
            'off_hours_end': 6,     # 6 AM
            'impossible_travel_threshold_km': 1000,
            'max_actions_per_minute': 60,
            'privilege_escalation_threshold': 3
        }
        
        # Initialize with demo data
        self._setup_demo_environment()
    
    def _setup_demo_environment(self):
        """Create baseline behavioral profiles for demo users"""
        demo_profiles = [
            UserBehaviorProfile(
                username='alice.smith',
                typical_login_hours={8, 9, 10, 11, 12, 13, 14, 15, 16, 17},
                typical_locations={'192.168.1.100', '192.168.1.101'},
                typical_actions={'login': 20, 'user.view': 50, 'user.create': 5},
                session_duration_avg=6.5,
                session_duration_std=1.2
            ),
            UserBehaviorProfile(
                username='bob.jones',
                typical_login_hours={7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18},
                typical_locations={'192.168.1.200', '192.168.1.201'},
                typical_actions={'login': 15, 'security.audit': 30, 'user.create': 10},
                session_duration_avg=8.0,
                session_duration_std=2.0
            ),
            UserBehaviorProfile(
                username='john.doe',
                typical_login_hours={9, 10, 11, 12, 13, 14, 15, 16, 17},
                typical_locations={'192.168.1.50'},
                typical_actions={'login': 10, 'system.logs': 25, 'inventory.view': 15},
                session_duration_avg=7.5,
                session_duration_std=1.5
            )
        ]
        
        for profile in demo_profiles:
            self.user_profiles[profile.username] = profile
        
        # Setup IP reputation data
        self.ip_reputation.update({
            '192.168.1.100': 'clean',
            '192.168.1.101': 'clean', 
            '192.168.1.200': 'clean',
            '192.168.1.201': 'clean',
            '192.168.1.50': 'clean',
            '10.0.0.1': 'suspicious',
            '203.0.113.1': 'malicious',  # Test IP from RFC
            '198.51.100.1': 'suspicious'  # Test IP from RFC
        })
        
        print("✅ User behavior monitoring initialized with baseline profiles")
    
    def log_user_action(self, username: str, action_type: str, source_ip: str, 
                       user_agent: str = "Unknown", resource: str = "", 
                       result: str = "success", session_id: str = None, 
                       details: Dict = None):
        """Log user action and perform real-time anomaly detection"""
        
        if session_id is None:
            session_id = secrets.token_urlsafe(16)
        
        if details is None:
            details = {}
        
        action = UserAction(
            username=username,
            action_type=action_type,
            timestamp=datetime.datetime.now(),
            source_ip=source_ip,
            user_agent=user_agent,
            resource=resource,
            result=result,
            session_id=session_id,
            details=details
        )
        
        self.action_history.append(action)
        
        # Perform real-time anomaly detection
        self._detect_anomalies(action)
        
        # Update user profile
        self._update_user_profile(action)
        
        return action
    
    def _detect_anomalies(self, action: UserAction):
        """Real-time anomaly detection for user actions"""
        alerts = []
        
        # 1. Time-based anomalies
        alerts.extend(self._detect_time_anomalies(action))
        
        # 2. Location-based anomalies  
        alerts.extend(self._detect_location_anomalies(action))
        
        # 3. Behavior-based anomalies
        alerts.extend(self._detect_behavior_anomalies(action))
        
        # 4. Failed login analysis
        alerts.extend(self._detect_failed_login_anomalies(action))
        
        # 5. Privilege escalation detection
        alerts.extend(self._detect_privilege_escalation(action))
        
        # Store alerts
        self.security_alerts.extend(alerts)
        
        # Print real-time alerts
        for alert in alerts:
            print(f"🚨 SECURITY ALERT: {alert}")
    
    def _detect_time_anomalies(self, action: UserAction) -> List[SecurityAlert]:
        """Detect time-based behavioral anomalies"""
        alerts = []
        current_hour = action.timestamp.hour
        
        profile = self.user_profiles.get(action.username)
        if not profile:
            return alerts
        
        # Off-hours access detection
        if (current_hour >= self.config['off_hours_start'] or 
            current_hour <= self.config['off_hours_end']):
            if current_hour not in profile.typical_login_hours:
                alerts.append(SecurityAlert(
                    alert_type="OFF_HOURS_ACCESS",
                    username=action.username,
                    severity="MEDIUM",
                    description=f"User accessed system at unusual hour: {current_hour:02d}:00",
                    details={
                        'access_hour': current_hour,
                        'typical_hours': list(profile.typical_login_hours),
                        'action': action.action_type
                    }
                ))
        
        # Rapid action detection
        recent_actions = [a for a in self.action_history 
                         if a.username == action.username and 
                         (action.timestamp - a.timestamp).total_seconds() < 60]
        
        if len(recent_actions) > self.config['max_actions_per_minute']:
            alerts.append(SecurityAlert(
                alert_type="RAPID_ACTIONS",
                username=action.username,
                severity="HIGH",
                description=f"Unusually rapid actions: {len(recent_actions)} in last minute",
                details={
                    'actions_per_minute': len(recent_actions),
                    'threshold': self.config['max_actions_per_minute'],
                    'action_types': [a.action_type for a in recent_actions]
                }
            ))
        
        return alerts
    
    def _detect_location_anomalies(self, action: UserAction) -> List[SecurityAlert]:
        """Detect location-based anomalies"""
        alerts = []
        
        profile = self.user_profiles.get(action.username)
        if not profile:
            return alerts
        
        # New location detection
        if action.source_ip not in profile.typical_locations:
            severity = "LOW"
            
            # Check IP reputation
            reputation = self.ip_reputation.get(action.source_ip, "unknown")
            if reputation == "suspicious":
                severity = "MEDIUM"
            elif reputation == "malicious":
                severity = "CRITICAL"
            
            alerts.append(SecurityAlert(
                alert_type="NEW_LOCATION",
                username=action.username,
                severity=severity,
                description=f"Access from new IP address: {action.source_ip}",
                details={
                    'new_ip': action.source_ip,
                    'ip_reputation': reputation,
                    'typical_locations': list(profile.typical_locations)
                }
            ))
        
        # Impossible travel detection (simplified)
        recent_different_ips = []
        for past_action in reversed(list(self.action_history)):
            if (past_action.username == action.username and 
                past_action.source_ip != action.source_ip and
                (action.timestamp - past_action.timestamp).total_seconds() < 3600):  # Within 1 hour
                recent_different_ips.append(past_action)
                break
        
        if recent_different_ips:
            time_diff = (action.timestamp - recent_different_ips[0].timestamp).total_seconds() / 60  # minutes
            if time_diff < 30:  # Less than 30 minutes between different IPs
                alerts.append(SecurityAlert(
                    alert_type="IMPOSSIBLE_TRAVEL",
                    username=action.username,
                    severity="HIGH",
                    description=f"Impossible travel: different IPs within {time_diff:.1f} minutes",
                    details={
                        'current_ip': action.source_ip,
                        'previous_ip': recent_different_ips[0].source_ip,
                        'time_difference_minutes': time_diff
                    }
                ))
        
        return alerts
    
    def _detect_behavior_anomalies(self, action: UserAction) -> List[SecurityAlert]:
        """Detect behavior pattern anomalies"""
        alerts = []
        
        profile = self.user_profiles.get(action.username)
        if not profile:
            return alerts
        
        # Unusual action type detection
        if action.action_type not in profile.typical_actions:
            # Check if it's a high-privilege action
            high_privilege_actions = [
                'user.delete', 'role.create', 'security.incident_response',
                'system.configure', 'finance.process_payments'
            ]
            
            if action.action_type in high_privilege_actions:
                alerts.append(SecurityAlert(
                    alert_type="UNUSUAL_PRIVILEGE_ACTION",
                    username=action.username,
                    severity="HIGH",
                    description=f"User performed unusual high-privilege action: {action.action_type}",
                    details={
                        'action': action.action_type,
                        'typical_actions': list(profile.typical_actions.keys()),
                        'resource': action.resource
                    }
                ))
        
        return alerts
    
    def _detect_failed_login_anomalies(self, action: UserAction) -> List[SecurityAlert]:
        """Detect failed login patterns"""
        alerts = []
        
        if action.action_type == "login" and action.result == "failed":
            # Count recent failed logins
            recent_failures = [
                a for a in self.action_history 
                if (a.username == action.username and 
                    a.action_type == "login" and 
                    a.result == "failed" and
                    (action.timestamp - a.timestamp).total_seconds() < 3600)  # Last hour
            ]
            
            if len(recent_failures) >= self.config['max_failed_logins']:
                alerts.append(SecurityAlert(
                    alert_type="BRUTE_FORCE_ATTEMPT",
                    username=action.username,
                    severity="CRITICAL",
                    description=f"Multiple failed logins: {len(recent_failures)} in last hour",
                    details={
                        'failed_attempts': len(recent_failures),
                        'source_ip': action.source_ip,
                        'threshold': self.config['max_failed_logins']
                    }
                ))
        
        return alerts
    
    def _detect_privilege_escalation(self, action: UserAction) -> List[SecurityAlert]:
        """Detect potential privilege escalation attempts"""
        alerts = []
        
        # Check for multiple privilege-related actions in short time
        privilege_actions = [
            a for a in self.action_history
            if (a.username == action.username and
                'role' in a.action_type and
                (action.timestamp - a.timestamp).total_seconds() < 300)  # 5 minutes
        ]
        
        if len(privilege_actions) >= self.config['privilege_escalation_threshold']:
            alerts.append(SecurityAlert(
                alert_type="PRIVILEGE_ESCALATION_ATTEMPT",
                username=action.username,
                severity="HIGH",
                description=f"Multiple privilege-related actions: {len(privilege_actions)} in 5 minutes",
                details={
                    'privilege_actions': [a.action_type for a in privilege_actions],
                    'threshold': self.config['privilege_escalation_threshold']
                }
            ))
        
        return alerts
    
    def _update_user_profile(self, action: UserAction):
        """Update user behavioral profile with new action"""
        profile = self.user_profiles.get(action.username)
        if not profile:
            # Create new profile for unknown user
            profile = UserBehaviorProfile(username=action.username)
            self.user_profiles[action.username] = profile
        
        # Update typical hours
        profile.typical_login_hours.add(action.timestamp.hour)
        
        # Update typical locations
        profile.typical_locations.add(action.source_ip)
        
        # Update typical actions
        profile.typical_actions[action.action_type] = profile.typical_actions.get(action.action_type, 0) + 1
        
        # Update last updated timestamp
        profile.last_updated = datetime.datetime.now()
    
    def generate_security_report(self, hours: int = 24) -> Dict:
        """Generate comprehensive security report"""
        cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=hours)
        
        # Filter recent actions and alerts
        recent_actions = [a for a in self.action_history if a.timestamp > cutoff_time]
        recent_alerts = [a for a in self.security_alerts if a.timestamp > cutoff_time]
        
        # Analyze by severity
        alert_severity = defaultdict(int)
        alert_types = defaultdict(int)
        users_with_alerts = set()
        
        for alert in recent_alerts:
            alert_severity[alert.severity] += 1
            alert_types[alert.alert_type] += 1
            users_with_alerts.add(alert.username)
        
        # Action analysis
        action_counts = defaultdict(int)
        unique_users = set()
        unique_ips = set()
        
        for action in recent_actions:
            action_counts[action.action_type] += 1
            unique_users.add(action.username)
            unique_ips.add(action.source_ip)
        
        return {
            'report_period_hours': hours,
            'total_actions': len(recent_actions),
            'total_alerts': len(recent_alerts),
            'unique_users_active': len(unique_users),
            'unique_source_ips': len(unique_ips),
            'alert_breakdown': {
                'by_severity': dict(alert_severity),
                'by_type': dict(alert_types),
                'users_affected': len(users_with_alerts)
            },
            'top_actions': dict(sorted(action_counts.items(), key=lambda x: x[1], reverse=True)[:10]),
            'high_risk_alerts': [
                {
                    'type': alert.alert_type,
                    'user': alert.username,
                    'severity': alert.severity,
                    'description': alert.description
                }
                for alert in recent_alerts if alert.severity in ['HIGH', 'CRITICAL']
            ]
        }

# Demo: User Behavior Monitoring System
print("🔍 USER BEHAVIOR MONITORING DEMONSTRATION")
print("=" * 50)

# Initialize monitoring system
behavior_monitor = UserBehaviorMonitor()

In [None]:
# Demonstration: Normal vs. Suspicious User Behavior
print(f"\n" + "="*40)
print("DEMO: Normal User Behavior Patterns")
print("="*40)

# Simulate normal user activities
normal_activities = [
    ('alice.smith', 'login', '192.168.1.100', 'success'),
    ('alice.smith', 'user.view', '192.168.1.100', 'success'),
    ('alice.smith', 'user.create', '192.168.1.100', 'success'),
    ('bob.jones', 'login', '192.168.1.200', 'success'),
    ('bob.jones', 'security.audit', '192.168.1.200', 'success'),
    ('john.doe', 'login', '192.168.1.50', 'success'),
    ('john.doe', 'system.logs', '192.168.1.50', 'success')
]

print(f"\n📊 Logging normal user activities...")
for username, action, ip, result in normal_activities:
    behavior_monitor.log_user_action(
        username=username,
        action_type=action,
        source_ip=ip,
        result=result,
        user_agent="Mozilla/5.0 (Normal Browser)"
    )

# Wait a moment, then simulate suspicious activities
import time
time.sleep(1)

print(f"\n" + "="*40)
print("DEMO: Suspicious Behavior Detection")
print("="*40)

print(f"\n🚨 Simulating suspicious activities...")

# 1. Off-hours access
print(f"\n1️⃣ Testing off-hours access detection:")
# Simulate action at 3 AM (off-hours)
suspicious_time = datetime.datetime.now().replace(hour=3, minute=0, second=0, microsecond=0)
off_hours_action = UserAction(
    username='alice.smith',
    action_type='user.delete',
    timestamp=suspicious_time,
    source_ip='192.168.1.100',
    user_agent='Automated Script',
    resource='/admin/users',
    result='success',
    session_id='suspicious_session_1'
)
behavior_monitor.action_history.append(off_hours_action)
behavior_monitor._detect_anomalies(off_hours_action)

# 2. New location access
print(f"\n2️⃣ Testing new location detection:")
behavior_monitor.log_user_action(
    username='bob.jones',
    action_type='security.audit',
    source_ip='203.0.113.1',  # Malicious IP
    result='success',
    user_agent="Unknown Browser"
)

# 3. Multiple failed logins (brute force)
print(f"\n3️⃣ Testing brute force detection:")
for i in range(6):  # Exceed the threshold of 5
    behavior_monitor.log_user_action(
        username='john.doe',
        action_type='login',
        source_ip='198.51.100.1',  # Suspicious IP
        result='failed',
        user_agent="Automated Attack Tool"
    )

# 4. Rapid actions (bot-like behavior)
print(f"\n4️⃣ Testing rapid action detection:")
for i in range(65):  # Exceed 60 actions per minute
    behavior_monitor.log_user_action(
        username='alice.smith',
        action_type='user.view',
        source_ip='192.168.1.100',
        result='success',
        user_agent="Automated Scraper"
    )

# 5. Unusual privilege escalation
print(f"\n5️⃣ Testing privilege escalation detection:")
privilege_actions = ['role.assign', 'role.create', 'role.modify', 'user.delete']
for action in privilege_actions:
    behavior_monitor.log_user_action(
        username='john.doe',
        action_type=action,
        source_ip='192.168.1.50',
        result='success',
        user_agent="Admin Tools"
    )

# 6. Impossible travel
print(f"\n6️⃣ Testing impossible travel detection:")
# First action from one location
behavior_monitor.log_user_action(
    username='carol.wilson',
    action_type='login',
    source_ip='192.168.1.10',
    result='success'
)

# Wait a brief moment, then access from far away
time.sleep(0.1)
behavior_monitor.log_user_action(
    username='carol.wilson',
    action_type='purchase.approve_large', 
    source_ip='10.0.0.1',  # Different location
    result='success'
)

# Generate security report
print(f"\n" + "="*40)
print("SECURITY MONITORING REPORT")
print("="*40)

report = behavior_monitor.generate_security_report(hours=1)

print(f"\n📈 Activity Summary (Last Hour):")
print(f"   Total Actions: {report['total_actions']}")
print(f"   Total Security Alerts: {report['total_alerts']}")
print(f"   Unique Active Users: {report['unique_users_active']}")
print(f"   Unique Source IPs: {report['unique_source_ips']}")

print(f"\n🚨 Alert Breakdown:")
severity_breakdown = report['alert_breakdown']['by_severity']
for severity, count in severity_breakdown.items():
    print(f"   {severity}: {count} alerts")

print(f"\n⚠️  Alert Types:")
type_breakdown = report['alert_breakdown']['by_type']
for alert_type, count in type_breakdown.items():
    print(f"   {alert_type}: {count}")

print(f"\n🔥 High-Risk Alerts:")
for alert in report['high_risk_alerts']:
    print(f"   [{alert['severity']}] {alert['type']}: {alert['description']}")
    print(f"       User: {alert['user']}")

print(f"\n📊 Top User Actions:")
for action_type, count in list(report['top_actions'].items())[:5]:
    print(f"   {action_type}: {count}")

# Show user profiles after learning
print(f"\n" + "="*40)
print("UPDATED USER BEHAVIORAL PROFILES")
print("="*40)

for username, profile in behavior_monitor.user_profiles.items():
    print(f"\n👤 {username}:")
    print(f"   Typical Hours: {sorted(list(profile.typical_login_hours))}")
    print(f"   Known Locations: {list(profile.typical_locations)}")
    print(f"   Common Actions: {dict(list(profile.typical_actions.items())[:5])}")
    print(f"   Profile Updated: {profile.last_updated.strftime('%Y-%m-%d %H:%M:%S')}")

print(f"\n✅ User behavior monitoring demonstration completed!")
print(f"   Generated {len(behavior_monitor.security_alerts)} security alerts")
print(f"   Tracked {len(behavior_monitor.action_history)} user actions")
print(f"   Monitoring {len(behavior_monitor.user_profiles)} user profiles")

## Section 2: Chapter 5 Exercise Solutions

This section provides complete solutions to the Chapter 5 exercises from the course guide, with interactive implementations and detailed analysis.

### Exercise 1: Third-Party Service Analysis
**Objective**: Identify third-party services in your organization and categorize which require individual vs. group access.

This exercise develops risk assessment skills by evaluating real-world services and their security requirements.

In [7]:
# Exercise 1: Third-Party Service Security Analysis Tool
from enum import Enum
from typing import NamedTuple

class RiskLevel(Enum):
    LOW = "Low"
    MEDIUM = "Medium" 
    HIGH = "High"
    CRITICAL = "Critical"

class AccountRequirement(Enum):
    INDIVIDUAL_REQUIRED = "Individual Required"
    INDIVIDUAL_PREFERRED = "Individual Preferred"
    GROUP_ACCEPTABLE = "Group Acceptable"
    SERVICE_ACCOUNT = "Service Account"

class ServiceCategory(Enum):
    PRODUCTIVITY = "Productivity"
    SECURITY = "Security" 
    FINANCIAL = "Financial"
    DEVELOPMENT = "Development"
    COMMUNICATION = "Communication"
    INFRASTRUCTURE = "Infrastructure"
    MARKETING = "Marketing"
    HR = "Human Resources"

@dataclass
class ThirdPartyService:
    name: str
    category: ServiceCategory
    description: str
    data_sensitivity: RiskLevel
    audit_importance: RiskLevel
    access_control_needs: RiskLevel
    compliance_requirements: List[str] = field(default_factory=list)
    current_setup: str = ""
    users_count: int = 0
    cost_per_seat: float = 0.0
    
class ServiceRiskAnalyzer:
    """
    Comprehensive analyzer for third-party service security requirements
    """
    
    def __init__(self):
        self.services = []
        self.analysis_criteria = {
            'data_sensitivity_factors': {
                'personal_data': 3,
                'financial_data': 4, 
                'customer_data': 3,
                'intellectual_property': 4,
                'system_credentials': 5,
                'audit_logs': 4
            },
            'compliance_factors': {
                'sox': 4,     # Sarbanes-Oxley
                'gdpr': 3,    # General Data Protection Regulation
                'hipaa': 5,   # Health Insurance Portability
                'pci_dss': 4, # Payment Card Industry
                'iso27001': 3 # Information Security Management
            },
            'business_impact_factors': {
                'revenue_generation': 4,
                'customer_service': 3,
                'operational_critical': 4,
                'brand_reputation': 4,
                'competitive_advantage': 3
            }
        }
        
        # Initialize with comprehensive service examples
        self._setup_example_services()
    
    def _setup_example_services(self):
        """Setup comprehensive examples of third-party services"""
        example_services = [
            # Financial & Payroll Services (HIGH RISK - Individual Required)
            ThirdPartyService(
                name="ADP Payroll System",
                category=ServiceCategory.FINANCIAL,
                description="Complete payroll processing and HR management",
                data_sensitivity=RiskLevel.CRITICAL,
                audit_importance=RiskLevel.CRITICAL,
                access_control_needs=RiskLevel.CRITICAL,
                compliance_requirements=['SOX', 'tax_compliance'],
                users_count=5,
                cost_per_seat=50.0
            ),
            ThirdPartyService(
                name="QuickBooks Online",
                category=ServiceCategory.FINANCIAL,
                description="Accounting and financial management software",
                data_sensitivity=RiskLevel.HIGH,
                audit_importance=RiskLevel.CRITICAL,
                access_control_needs=RiskLevel.HIGH,
                compliance_requirements=['SOX', 'GAAP'],
                users_count=3,
                cost_per_seat=25.0
            ),
            
            # Development & Infrastructure (MEDIUM-HIGH RISK)
            ThirdPartyService(
                name="AWS Console",
                category=ServiceCategory.INFRASTRUCTURE,
                description="Amazon Web Services cloud infrastructure management",
                data_sensitivity=RiskLevel.HIGH,
                audit_importance=RiskLevel.HIGH,
                access_control_needs=RiskLevel.CRITICAL,
                compliance_requirements=['SOC2', 'ISO27001'],
                users_count=8,
                cost_per_seat=0.0  # Usage-based pricing
            ),
            ThirdPartyService(
                name="GitHub Enterprise",
                category=ServiceCategory.DEVELOPMENT,
                description="Source code repository and collaboration platform",
                data_sensitivity=RiskLevel.HIGH,
                audit_importance=RiskLevel.HIGH,
                access_control_needs=RiskLevel.HIGH,
                compliance_requirements=['SOC2'],
                users_count=15,
                cost_per_seat=20.0
            ),
            ThirdPartyService(
                name="Docker Hub",
                category=ServiceCategory.DEVELOPMENT,
                description="Container image registry and deployment platform",
                data_sensitivity=RiskLevel.MEDIUM,
                audit_importance=RiskLevel.HIGH,
                access_control_needs=RiskLevel.MEDIUM,
                users_count=12,
                cost_per_seat=5.0
            ),
            
            # Security Services (HIGH RISK - Individual Required)
            ThirdPartyService(
                name="Okta Identity Management",
                category=ServiceCategory.SECURITY,
                description="Single sign-on and identity management platform",
                data_sensitivity=RiskLevel.CRITICAL,
                audit_importance=RiskLevel.CRITICAL,
                access_control_needs=RiskLevel.CRITICAL,
                compliance_requirements=['SOC2', 'ISO27001'],
                users_count=3,
                cost_per_seat=100.0
            ),
            ThirdPartyService(
                name="Splunk SIEM",
                category=ServiceCategory.SECURITY,
                description="Security information and event management system",
                data_sensitivity=RiskLevel.HIGH,
                audit_importance=RiskLevel.CRITICAL,
                access_control_needs=RiskLevel.HIGH,
                compliance_requirements=['SOC2', 'NIST'],
                users_count=4,
                cost_per_seat=200.0
            ),
            
            # Communication Services (MEDIUM RISK)
            ThirdPartyService(
                name="Microsoft 365",
                category=ServiceCategory.COMMUNICATION,
                description="Email, collaboration, and productivity suite",
                data_sensitivity=RiskLevel.MEDIUM,
                audit_importance=RiskLevel.MEDIUM,
                access_control_needs=RiskLevel.MEDIUM,
                compliance_requirements=['GDPR'],
                users_count=50,
                cost_per_seat=12.0
            ),
            ThirdPartyService(
                name="Slack Enterprise",
                category=ServiceCategory.COMMUNICATION,
                description="Team communication and collaboration platform",
                data_sensitivity=RiskLevel.MEDIUM,
                audit_importance=RiskLevel.MEDIUM,
                access_control_needs=RiskLevel.MEDIUM,
                compliance_requirements=['GDPR', 'SOC2'],
                users_count=45,
                cost_per_seat=8.0
            ),
            ThirdPartyService(
                name="Zoom Pro",
                category=ServiceCategory.COMMUNICATION,
                description="Video conferencing and webinar platform",
                data_sensitivity=RiskLevel.LOW,
                audit_importance=RiskLevel.LOW,
                access_control_needs=RiskLevel.LOW,
                users_count=30,
                cost_per_seat=15.0
            ),
            
            # Marketing Services (LOW-MEDIUM RISK)
            ThirdPartyService(
                name="HubSpot CRM",
                category=ServiceCategory.MARKETING,
                description="Customer relationship management and marketing automation",
                data_sensitivity=RiskLevel.MEDIUM,
                audit_importance=RiskLevel.MEDIUM,
                access_control_needs=RiskLevel.MEDIUM,
                compliance_requirements=['GDPR', 'CCPA'],
                users_count=8,
                cost_per_seat=45.0
            ),
            ThirdPartyService(
                name="Mailchimp",
                category=ServiceCategory.MARKETING,
                description="Email marketing and automation platform",
                data_sensitivity=RiskLevel.MEDIUM,
                audit_importance=RiskLevel.LOW,
                access_control_needs=RiskLevel.LOW,
                compliance_requirements=['GDPR'],
                users_count=5,
                cost_per_seat=20.0
            ),
            ThirdPartyService(
                name="Canva Pro",
                category=ServiceCategory.MARKETING,
                description="Graphic design and visual content creation",
                data_sensitivity=RiskLevel.LOW,
                audit_importance=RiskLevel.LOW,
                access_control_needs=RiskLevel.LOW,
                users_count=10,
                cost_per_seat=10.0
            ),
            
            # Productivity Services (LOW-MEDIUM RISK)
            ThirdPartyService(
                name="Adobe Creative Cloud",
                category=ServiceCategory.PRODUCTIVITY,
                description="Creative software suite for design and content creation",
                data_sensitivity=RiskLevel.LOW,
                audit_importance=RiskLevel.LOW,
                access_control_needs=RiskLevel.LOW,
                users_count=6,
                cost_per_seat=60.0
            ),
            ThirdPartyService(
                name="Shutterstock",
                category=ServiceCategory.PRODUCTIVITY,
                description="Stock photography and creative assets",
                data_sensitivity=RiskLevel.LOW,
                audit_importance=RiskLevel.LOW,
                access_control_needs=RiskLevel.LOW,
                users_count=8,
                cost_per_seat=15.0
            )
        ]
        
        self.services.extend(example_services)
        print(f"✅ Initialized service analyzer with {len(example_services)} example services")
    
    def analyze_service(self, service: ThirdPartyService) -> Dict:
        """Perform comprehensive risk analysis for a service"""
        
        # Calculate composite risk score
        risk_scores = {
            'data_sensitivity': self._risk_to_score(service.data_sensitivity),
            'audit_importance': self._risk_to_score(service.audit_importance),
            'access_control': self._risk_to_score(service.access_control_needs)
        }
        
        # Compliance multiplier
        compliance_multiplier = 1.0
        for req in service.compliance_requirements:
            if req.lower() in ['sox', 'hipaa', 'pci_dss']:
                compliance_multiplier += 0.5
            elif req.lower() in ['gdpr', 'iso27001', 'soc2']:
                compliance_multiplier += 0.3
        
        # Calculate total risk score (1-10 scale)
        total_risk_score = (sum(risk_scores.values()) / len(risk_scores)) * compliance_multiplier
        total_risk_score = min(total_risk_score, 10.0)  # Cap at 10
        
        # Determine account requirement
        account_requirement = self._determine_account_requirement(service, total_risk_score)
        
        # Calculate financial impact
        annual_cost_individual = service.users_count * service.cost_per_seat * 12
        annual_cost_shared = service.cost_per_seat * 12 if service.cost_per_seat > 0 else 0
        potential_savings = annual_cost_individual - annual_cost_shared
        
        # Risk assessment
        risk_level = self._score_to_risk_level(total_risk_score)
        
        return {
            'service_name': service.name,
            'category': service.category.value,
            'risk_level': risk_level.value,
            'risk_score': round(total_risk_score, 2),
            'risk_breakdown': risk_scores,
            'account_requirement': account_requirement.value,
            'compliance_requirements': service.compliance_requirements,
            'financial_analysis': {
                'users_count': service.users_count,
                'cost_per_seat': service.cost_per_seat,
                'annual_cost_individual': annual_cost_individual,
                'annual_cost_shared': annual_cost_shared,
                'potential_savings': potential_savings
            },
            'recommendation': self._generate_recommendation(service, account_requirement, total_risk_score),
            'justification': self._generate_justification(service, account_requirement, total_risk_score)
        }
    
    def _risk_to_score(self, risk_level: RiskLevel) -> float:
        """Convert risk level to numeric score"""
        mapping = {
            RiskLevel.LOW: 2.0,
            RiskLevel.MEDIUM: 5.0,
            RiskLevel.HIGH: 8.0,
            RiskLevel.CRITICAL: 10.0
        }
        return mapping.get(risk_level, 5.0)
    
    def _score_to_risk_level(self, score: float) -> RiskLevel:
        """Convert numeric score to risk level"""
        if score >= 8.0:
            return RiskLevel.CRITICAL
        elif score >= 6.0:
            return RiskLevel.HIGH
        elif score >= 4.0:
            return RiskLevel.MEDIUM
        else:
            return RiskLevel.LOW
    
    def _determine_account_requirement(self, service: ThirdPartyService, risk_score: float) -> AccountRequirement:
        """Determine appropriate account setup based on risk analysis"""
        
        # Critical services always require individual accounts
        if (service.data_sensitivity == RiskLevel.CRITICAL or 
            service.audit_importance == RiskLevel.CRITICAL or
            risk_score >= 8.0):
            return AccountRequirement.INDIVIDUAL_REQUIRED
        
        # High-risk services strongly prefer individual accounts
        if (service.data_sensitivity == RiskLevel.HIGH or 
            service.audit_importance == RiskLevel.HIGH or
            risk_score >= 6.0):
            return AccountRequirement.INDIVIDUAL_PREFERRED
        
        # Services with compliance requirements
        critical_compliance = ['sox', 'hipaa', 'pci_dss']
        if any(req.lower() in critical_compliance for req in service.compliance_requirements):
            return AccountRequirement.INDIVIDUAL_REQUIRED
        
        # Low-risk services may accept group accounts
        if (service.data_sensitivity == RiskLevel.LOW and 
            service.audit_importance == RiskLevel.LOW and
            risk_score < 4.0):
            return AccountRequirement.GROUP_ACCEPTABLE
        
        # Default to individual preferred
        return AccountRequirement.INDIVIDUAL_PREFERRED
    
    def _generate_recommendation(self, service: ThirdPartyService, 
                                account_req: AccountRequirement, risk_score: float) -> str:
        """Generate specific recommendation for the service"""
        
        if account_req == AccountRequirement.INDIVIDUAL_REQUIRED:
            return f"MANDATORY: Use individual accounts for {service.name}. " \
                   f"High risk score ({risk_score}) and critical compliance requirements make this non-negotiable."
        
        elif account_req == AccountRequirement.INDIVIDUAL_PREFERRED:
            return f"RECOMMENDED: Use individual accounts for {service.name}. " \
                   f"Risk level justifies the additional cost for audit and security benefits."
        
        elif account_req == AccountRequirement.GROUP_ACCEPTABLE:
            return f"ACCEPTABLE: Shared account may be used for {service.name}. " \
                   f"Low risk profile makes this a reasonable cost-saving measure."
        
        else:
            return f"REVIEW: Consider service account architecture for {service.name}."
    
    def _generate_justification(self, service: ThirdPartyService,
                               account_req: AccountRequirement, risk_score: float) -> str:
        """Generate detailed justification for the recommendation"""
        
        justifications = []
        
        # Data sensitivity justification
        if service.data_sensitivity in [RiskLevel.HIGH, RiskLevel.CRITICAL]:
            justifications.append(f"Handles {service.data_sensitivity.value.lower()} sensitivity data")
        
        # Audit importance
        if service.audit_importance in [RiskLevel.HIGH, RiskLevel.CRITICAL]:
            justifications.append(f"Has {service.audit_importance.value.lower()} audit importance")
        
        # Compliance requirements
        if service.compliance_requirements:
            justifications.append(f"Subject to {', '.join(service.compliance_requirements)} compliance")
        
        # Access control needs
        if service.access_control_needs in [RiskLevel.HIGH, RiskLevel.CRITICAL]:
            justifications.append(f"Requires {service.access_control_needs.value.lower()} access control")
        
        # Financial consideration
        if account_req == AccountRequirement.GROUP_ACCEPTABLE:
            justifications.append("Low security risk allows cost optimization")
        
        return "; ".join(justifications) if justifications else "Standard security practices apply"
    
    def generate_comprehensive_report(self) -> Dict:
        """Generate comprehensive analysis report for all services"""
        
        # Analyze all services
        analyses = [self.analyze_service(service) for service in self.services]
        
        # Categorize by requirement
        categorized = {
            'individual_required': [],
            'individual_preferred': [],
            'group_acceptable': [],
            'service_account': []
        }
        
        # Financial summary
        total_individual_cost = 0
        total_shared_cost = 0
        total_potential_savings = 0
        
        for analysis in analyses:
            req = analysis['account_requirement']
            if 'Required' in req:
                categorized['individual_required'].append(analysis)
            elif 'Preferred' in req:
                categorized['individual_preferred'].append(analysis)
            elif 'Acceptable' in req:
                categorized['group_acceptable'].append(analysis)
            else:
                categorized['service_account'].append(analysis)
            
            # Add to financial totals
            total_individual_cost += analysis['financial_analysis']['annual_cost_individual']
            total_shared_cost += analysis['financial_analysis']['annual_cost_shared']
            total_potential_savings += analysis['financial_analysis']['potential_savings']
        
        # Risk distribution
        risk_distribution = defaultdict(int)
        for analysis in analyses:
            risk_distribution[analysis['risk_level']] += 1
        
        # Category distribution
        category_distribution = defaultdict(int)
        for analysis in analyses:
            category_distribution[analysis['category']] += 1
        
        return {
            'summary': {
                'total_services': len(analyses),
                'individual_required': len(categorized['individual_required']),
                'individual_preferred': len(categorized['individual_preferred']),
                'group_acceptable': len(categorized['group_acceptable']),
                'service_account': len(categorized['service_account'])
            },
            'financial_impact': {
                'total_annual_cost_individual': total_individual_cost,
                'total_annual_cost_shared': total_shared_cost,
                'total_potential_savings': total_potential_savings,
                'average_cost_per_service': total_individual_cost / len(analyses) if analyses else 0
            },
            'risk_distribution': dict(risk_distribution),
            'category_distribution': dict(category_distribution),
            'categorized_services': categorized,
            'detailed_analyses': analyses
        }

# Demo: Third-Party Service Risk Analysis
print("🔍 EXERCISE 1: THIRD-PARTY SERVICE ANALYSIS")
print("=" * 60)

# Initialize the analyzer
service_analyzer = ServiceRiskAnalyzer()

🔍 EXERCISE 1: THIRD-PARTY SERVICE ANALYSIS
✅ Initialized service analyzer with 15 example services


In [8]:
# Generate comprehensive analysis report
report = service_analyzer.generate_comprehensive_report()

print(f"\n📊 SERVICE ANALYSIS SUMMARY")
print("=" * 40)
print(f"Total Services Analyzed: {report['summary']['total_services']}")
print(f"Individual Required: {report['summary']['individual_required']}")
print(f"Individual Preferred: {report['summary']['individual_preferred']}")
print(f"Group Acceptable: {report['summary']['group_acceptable']}")

print(f"\n💰 FINANCIAL IMPACT ANALYSIS")
print("=" * 40)
print(f"Annual Cost (Individual Accounts): ${report['financial_impact']['total_annual_cost_individual']:,.2f}")
print(f"Annual Cost (Shared Accounts): ${report['financial_impact']['total_annual_cost_shared']:,.2f}")
print(f"Potential Savings (Shared): ${report['financial_impact']['total_potential_savings']:,.2f}")
print(f"Average Cost per Service: ${report['financial_impact']['average_cost_per_service']:,.2f}")

print(f"\n🎯 RISK DISTRIBUTION")
print("=" * 40)
for risk_level, count in report['risk_distribution'].items():
    print(f"{risk_level}: {count} services")

print(f"\n📂 CATEGORY BREAKDOWN")
print("=" * 40)
for category, count in report['category_distribution'].items():
    print(f"{category}: {count} services")

print(f"\n🚨 HIGH-PRIORITY SERVICES (Individual Required)")
print("=" * 40)
for service in report['categorized_services']['individual_required']:
    print(f"\n🔴 {service['service_name']} ({service['category']})")
    print(f"   Risk Score: {service['risk_score']}/10 ({service['risk_level']})")
    print(f"   Compliance: {', '.join(service['compliance_requirements']) if service['compliance_requirements'] else 'None'}")
    print(f"   Annual Cost: ${service['financial_analysis']['annual_cost_individual']:,.2f}")
    print(f"   Justification: {service['justification']}")

print(f"\n🟡 MEDIUM-PRIORITY SERVICES (Individual Preferred)")
print("=" * 40)
for service in report['categorized_services']['individual_preferred']:
    print(f"\n🟡 {service['service_name']} ({service['category']})")
    print(f"   Risk Score: {service['risk_score']}/10 ({service['risk_level']})")
    print(f"   Annual Cost: ${service['financial_analysis']['annual_cost_individual']:,.2f}")
    print(f"   Potential Savings: ${service['financial_analysis']['potential_savings']:,.2f}")

print(f"\n🟢 LOW-PRIORITY SERVICES (Group Acceptable)")
print("=" * 40)
for service in report['categorized_services']['group_acceptable']:
    print(f"\n🟢 {service['service_name']} ({service['category']})")
    print(f"   Risk Score: {service['risk_score']}/10 ({service['risk_level']})")
    print(f"   Potential Savings: ${service['financial_analysis']['potential_savings']:,.2f}")
    print(f"   Recommendation: {service['recommendation']}")

# Key insights and recommendations
print(f"\n💡 KEY INSIGHTS & RECOMMENDATIONS")
print("=" * 40)

# Calculate security ROI
security_critical_services = len(report['categorized_services']['individual_required'])
total_services = report['summary']['total_services']
security_percentage = (security_critical_services / total_services) * 100

print(f"\n1. SECURITY POSTURE:")
print(f"   • {security_percentage:.1f}% of services require individual accounts")
print(f"   • {report['summary']['individual_required']} services are security-critical")
print(f"   • Focus on mandatory individual accounts for maximum security impact")

# Financial optimization
individual_required_cost = sum(s['financial_analysis']['annual_cost_individual'] 
                              for s in report['categorized_services']['individual_required'])
group_acceptable_savings = sum(s['financial_analysis']['potential_savings'] 
                              for s in report['categorized_services']['group_acceptable'])

print(f"\n2. FINANCIAL OPTIMIZATION:")
print(f"   • Mandatory security costs: ${individual_required_cost:,.2f}/year")
print(f"   • Potential safe savings: ${group_acceptable_savings:,.2f}/year")
print(f"   • Security ROI: Protect {security_critical_services} critical services")

print(f"\n3. IMPLEMENTATION PRIORITIES:")
print(f"   • Phase 1: Implement individual accounts for {security_critical_services} critical services")
print(f"   • Phase 2: Evaluate preferred individual accounts based on budget")
print(f"   • Phase 3: Optimize group accounts for low-risk services")

print(f"\n4. COMPLIANCE CONSIDERATIONS:")
compliance_services = [s for s in report['detailed_analyses'] if s['compliance_requirements']]
print(f"   • {len(compliance_services)} services have compliance requirements")
print(f"   • All compliance-related services need individual accounts")
print(f"   • Regular audit trails are mandatory for compliance")

print(f"\n✅ Exercise 1 Analysis Complete!")
print(f"   Analyzed {len(service_analyzer.services)} third-party services")
print(f"   Generated security and financial recommendations")
print(f"   Provided implementation roadmap")


📊 SERVICE ANALYSIS SUMMARY
Total Services Analyzed: 15
Individual Required: 7
Individual Preferred: 4
Group Acceptable: 4

💰 FINANCIAL IMPACT ANALYSIS
Annual Cost (Individual Accounts): $50,820.00
Annual Cost (Shared Accounts): $7,020.00
Potential Savings (Shared): $43,800.00
Average Cost per Service: $3,388.00

🎯 RISK DISTRIBUTION
Critical: 7 services
High: 3 services
Low: 5 services

📂 CATEGORY BREAKDOWN
Financial: 2 services
Infrastructure: 1 services
Development: 2 services
Security: 2 services
Communication: 3 services
Marketing: 3 services
Productivity: 2 services

🚨 HIGH-PRIORITY SERVICES (Individual Required)

🔴 ADP Payroll System (Financial)
   Risk Score: 10.0/10 (Critical)
   Compliance: SOX, tax_compliance
   Annual Cost: $3,000.00
   Justification: Handles critical sensitivity data; Has critical audit importance; Subject to SOX, tax_compliance compliance; Requires critical access control

🔴 QuickBooks Online (Financial)
   Risk Score: 10.0/10 (Critical)
   Compliance: SOX

## Exercise 2: Role Design for Online Shoe Store

### Objective
Design a comprehensive role-based access control system for an online shoe store, focusing on:
- Customer-facing operations (browsing, purchasing, returns)
- Inventory management (stock, suppliers, warehousing)
- Business operations (sales, marketing, customer service)
- Administrative functions (user management, reporting, system maintenance)

### Requirements Analysis
Your system must handle:
- **Customer Roles**: Different customer types and access levels
- **Employee Roles**: Various departments with specific permissions
- **Supplier Roles**: External partners with limited access
- **Administrative Roles**: System management and oversight
- **Workflow Integration**: Order processing, inventory updates, returns
- **Compliance**: Financial regulations, data protection, audit requirements

In [9]:
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Set, Optional, Tuple
from collections import defaultdict
import json

class AccessLevel(Enum):
    NONE = 0
    READ = 1
    WRITE = 2
    ADMIN = 3
    OWNER = 4

class ResourceType(Enum):
    # Customer-facing resources
    PRODUCT_CATALOG = "product_catalog"
    SHOPPING_CART = "shopping_cart"
    USER_PROFILE = "user_profile"
    ORDER_HISTORY = "order_history"
    REVIEWS = "reviews"
    WISHLIST = "wishlist"
    
    # Inventory resources
    INVENTORY = "inventory"
    STOCK_LEVELS = "stock_levels"
    SUPPLIER_DATA = "supplier_data"
    WAREHOUSE_OPERATIONS = "warehouse_operations"
    PURCHASE_ORDERS = "purchase_orders"
    
    # Business resources
    SALES_DATA = "sales_data"
    CUSTOMER_DATA = "customer_data"
    MARKETING_CAMPAIGNS = "marketing_campaigns"
    FINANCIAL_REPORTS = "financial_reports"
    ANALYTICS = "analytics"
    
    # Administrative resources
    USER_MANAGEMENT = "user_management"
    SYSTEM_CONFIG = "system_config"
    AUDIT_LOGS = "audit_logs"
    BACKUP_RESTORE = "backup_restore"
    SECURITY_SETTINGS = "security_settings"

@dataclass
class Permission:
    resource: ResourceType
    access_level: AccessLevel
    conditions: List[str] = field(default_factory=list)
    time_restrictions: List[str] = field(default_factory=list)
    
    def __str__(self):
        return f"{self.access_level.name} on {self.resource.value}"

@dataclass
class Role:
    name: str
    description: str
    permissions: List[Permission] = field(default_factory=list)
    inherits_from: List[str] = field(default_factory=list)
    max_users: Optional[int] = None
    requires_approval: bool = False
    compliance_requirements: List[str] = field(default_factory=list)
    
    def add_permission(self, resource: ResourceType, access_level: AccessLevel, 
                      conditions: List[str] = None, time_restrictions: List[str] = None):
        self.permissions.append(Permission(
            resource=resource,
            access_level=access_level,
            conditions=conditions or [],
            time_restrictions=time_restrictions or []
        ))

class OnlineShoeStoreRBACSystem:
    def __init__(self):
        self.roles: Dict[str, Role] = {}
        self.user_roles: Dict[str, List[str]] = defaultdict(list)
        self.role_hierarchy: Dict[str, Set[str]] = defaultdict(set)
        self.workflow_permissions: Dict[str, Dict[str, List[str]]] = {}
        
        self._initialize_roles()
        self._setup_workflows()
    
    def _initialize_roles(self):
        # === CUSTOMER ROLES ===
        
        # Guest Customer (Anonymous Browser)
        guest_role = Role(
            name="guest_customer",
            description="Anonymous visitors who can browse products",
            requires_approval=False
        )
        guest_role.add_permission(ResourceType.PRODUCT_CATALOG, AccessLevel.READ)
        guest_role.add_permission(ResourceType.REVIEWS, AccessLevel.READ)
        self.roles["guest_customer"] = guest_role
        
        # Registered Customer
        registered_customer = Role(
            name="registered_customer",
            description="Registered customers with accounts",
            inherits_from=["guest_customer"],
            requires_approval=False
        )
        registered_customer.add_permission(ResourceType.SHOPPING_CART, AccessLevel.WRITE)
        registered_customer.add_permission(ResourceType.USER_PROFILE, AccessLevel.WRITE, 
                                         conditions=["own_profile_only"])
        registered_customer.add_permission(ResourceType.ORDER_HISTORY, AccessLevel.READ,
                                         conditions=["own_orders_only"])
        registered_customer.add_permission(ResourceType.REVIEWS, AccessLevel.WRITE,
                                         conditions=["purchased_products_only"])
        registered_customer.add_permission(ResourceType.WISHLIST, AccessLevel.WRITE,
                                         conditions=["own_wishlist_only"])
        self.roles["registered_customer"] = registered_customer
        
        # Premium Customer
        premium_customer = Role(
            name="premium_customer",
            description="Premium customers with additional benefits",
            inherits_from=["registered_customer"],
            requires_approval=False
        )
        premium_customer.add_permission(ResourceType.ANALYTICS, AccessLevel.READ,
                                      conditions=["personal_analytics_only"])
        self.roles["premium_customer"] = premium_customer
        
        # === EMPLOYEE ROLES ===
        
        # Customer Service Representative
        customer_service = Role(
            name="customer_service_rep",
            description="Handle customer inquiries and basic order management",
            requires_approval=True,
            compliance_requirements=["customer_data_training", "privacy_certification"]
        )
        customer_service.add_permission(ResourceType.CUSTOMER_DATA, AccessLevel.READ,
                                      conditions=["active_tickets_only"])
        customer_service.add_permission(ResourceType.ORDER_HISTORY, AccessLevel.READ)
        customer_service.add_permission(ResourceType.ORDER_HISTORY, AccessLevel.WRITE,
                                       conditions=["status_updates_only"])
        customer_service.add_permission(ResourceType.PRODUCT_CATALOG, AccessLevel.READ)
        customer_service.add_permission(ResourceType.INVENTORY, AccessLevel.READ,
                                       conditions=["stock_levels_only"])
        self.roles["customer_service_rep"] = customer_service
        
        # Sales Associate
        sales_associate = Role(
            name="sales_associate",
            description="Process orders and handle sales operations",
            inherits_from=["customer_service_rep"],
            requires_approval=True
        )
        sales_associate.add_permission(ResourceType.SALES_DATA, AccessLevel.READ,
                                     conditions=["own_sales_only"])
        sales_associate.add_permission(ResourceType.MARKETING_CAMPAIGNS, AccessLevel.READ)
        self.roles["sales_associate"] = sales_associate
        
        # Inventory Manager
        inventory_manager = Role(
            name="inventory_manager",
            description="Manage stock levels and supplier relationships",
            requires_approval=True,
            compliance_requirements=["financial_controls_training"]
        )
        inventory_manager.add_permission(ResourceType.INVENTORY, AccessLevel.ADMIN)
        inventory_manager.add_permission(ResourceType.STOCK_LEVELS, AccessLevel.ADMIN)
        inventory_manager.add_permission(ResourceType.SUPPLIER_DATA, AccessLevel.WRITE)
        inventory_manager.add_permission(ResourceType.WAREHOUSE_OPERATIONS, AccessLevel.ADMIN)
        inventory_manager.add_permission(ResourceType.PURCHASE_ORDERS, AccessLevel.ADMIN)
        inventory_manager.add_permission(ResourceType.PRODUCT_CATALOG, AccessLevel.WRITE,
                                       conditions=["inventory_updates_only"])
        self.roles["inventory_manager"] = inventory_manager
        
        # Marketing Specialist
        marketing_specialist = Role(
            name="marketing_specialist",
            description="Create and manage marketing campaigns",
            requires_approval=True,
            compliance_requirements=["marketing_compliance_training"]
        )
        marketing_specialist.add_permission(ResourceType.MARKETING_CAMPAIGNS, AccessLevel.ADMIN)
        marketing_specialist.add_permission(ResourceType.CUSTOMER_DATA, AccessLevel.READ,
                                          conditions=["analytics_only", "anonymized_data"])
        marketing_specialist.add_permission(ResourceType.SALES_DATA, AccessLevel.READ,
                                          conditions=["aggregated_data_only"])
        marketing_specialist.add_permission(ResourceType.ANALYTICS, AccessLevel.READ)
        marketing_specialist.add_permission(ResourceType.PRODUCT_CATALOG, AccessLevel.READ)
        self.roles["marketing_specialist"] = marketing_specialist
        
        # Financial Analyst
        financial_analyst = Role(
            name="financial_analyst",
            description="Access financial data and generate reports",
            requires_approval=True,
            max_users=3,
            compliance_requirements=["financial_regulations", "sox_compliance", "data_protection"]
        )
        financial_analyst.add_permission(ResourceType.FINANCIAL_REPORTS, AccessLevel.ADMIN)
        financial_analyst.add_permission(ResourceType.SALES_DATA, AccessLevel.READ)
        financial_analyst.add_permission(ResourceType.ANALYTICS, AccessLevel.READ)
        financial_analyst.add_permission(ResourceType.PURCHASE_ORDERS, AccessLevel.READ)
        self.roles["financial_analyst"] = financial_analyst
        
        # Department Manager
        department_manager = Role(
            name="department_manager",
            description="Manage specific departments (Sales, Marketing, Customer Service)",
            requires_approval=True,
            max_users=10
        )
        department_manager.add_permission(ResourceType.CUSTOMER_DATA, AccessLevel.READ)
        department_manager.add_permission(ResourceType.SALES_DATA, AccessLevel.READ,
                                        conditions=["department_only"])
        department_manager.add_permission(ResourceType.ANALYTICS, AccessLevel.READ,
                                        conditions=["department_metrics"])
        department_manager.add_permission(ResourceType.USER_MANAGEMENT, AccessLevel.WRITE,
                                        conditions=["department_staff_only"])
        self.roles["department_manager"] = department_manager
        
        # === SUPPLIER ROLES ===
        
        # Supplier Portal User
        supplier_user = Role(
            name="supplier_user",
            description="External suppliers accessing their data",
            requires_approval=True,
            compliance_requirements=["supplier_agreement", "data_sharing_agreement"]
        )
        supplier_user.add_permission(ResourceType.SUPPLIER_DATA, AccessLevel.READ,
                                   conditions=["own_company_only"])
        supplier_user.add_permission(ResourceType.PURCHASE_ORDERS, AccessLevel.READ,
                                   conditions=["own_orders_only"])
        supplier_user.add_permission(ResourceType.INVENTORY, AccessLevel.READ,
                                   conditions=["own_products_only"])
        self.roles["supplier_user"] = supplier_user
        
        # === ADMINISTRATIVE ROLES ===
        
        # System Administrator
        system_admin = Role(
            name="system_administrator",
            description="Full system access for technical maintenance",
            requires_approval=True,
            max_users=2,
            compliance_requirements=["security_clearance", "technical_certification"]
        )
        for resource in ResourceType:
            system_admin.add_permission(resource, AccessLevel.OWNER)
        self.roles["system_administrator"] = system_admin
        
        # Security Auditor
        security_auditor = Role(
            name="security_auditor",
            description="Monitor security and generate audit reports",
            requires_approval=True,
            max_users=3,
            compliance_requirements=["audit_certification", "security_training"]
        )
        security_auditor.add_permission(ResourceType.AUDIT_LOGS, AccessLevel.READ)
        security_auditor.add_permission(ResourceType.USER_MANAGEMENT, AccessLevel.READ)
        security_auditor.add_permission(ResourceType.SECURITY_SETTINGS, AccessLevel.READ)
        security_auditor.add_permission(ResourceType.SYSTEM_CONFIG, AccessLevel.READ)
        self.roles["security_auditor"] = security_auditor
        
        # Business Owner
        business_owner = Role(
            name="business_owner",
            description="Executive oversight with full business access",
            requires_approval=True,
            max_users=5,
            compliance_requirements=["executive_training", "fiduciary_responsibility"]
        )
        # Business owners get admin access to business resources
        business_resources = [
            ResourceType.SALES_DATA, ResourceType.CUSTOMER_DATA,
            ResourceType.FINANCIAL_REPORTS, ResourceType.ANALYTICS,
            ResourceType.MARKETING_CAMPAIGNS, ResourceType.INVENTORY
        ]
        for resource in business_resources:
            business_owner.add_permission(resource, AccessLevel.ADMIN)
        
        business_owner.add_permission(ResourceType.USER_MANAGEMENT, AccessLevel.ADMIN,
                                    conditions=["business_users_only"])
        self.roles["business_owner"] = business_owner
        
        # Setup role hierarchy
        self._setup_role_hierarchy()
    
    def _setup_role_hierarchy(self):
        """Establish inheritance relationships between roles"""
        hierarchy_map = {
            "registered_customer": {"guest_customer"},
            "premium_customer": {"registered_customer"},
            "sales_associate": {"customer_service_rep"},
            "department_manager": {"sales_associate", "marketing_specialist"},
            "business_owner": {"department_manager", "financial_analyst"}
        }
        
        for role, inherited_roles in hierarchy_map.items():
            self.role_hierarchy[role] = inherited_roles
    
    def _setup_workflows(self):
        """Define workflow-based permission requirements"""
        self.workflow_permissions = {
            "order_processing": {
                "create_order": ["registered_customer", "sales_associate"],
                "modify_order": ["sales_associate", "customer_service_rep"],
                "cancel_order": ["registered_customer", "customer_service_rep", "sales_associate"],
                "process_payment": ["sales_associate", "financial_analyst"],
                "ship_order": ["inventory_manager", "sales_associate"],
                "complete_order": ["sales_associate", "customer_service_rep"]
            },
            "inventory_management": {
                "add_product": ["inventory_manager"],
                "update_stock": ["inventory_manager", "sales_associate"],
                "create_purchase_order": ["inventory_manager"],
                "receive_shipment": ["inventory_manager"],
                "manage_suppliers": ["inventory_manager", "business_owner"]
            },
            "customer_management": {
                "view_customer_data": ["customer_service_rep", "sales_associate", "department_manager"],
                "update_customer_info": ["customer_service_rep"],
                "handle_complaints": ["customer_service_rep", "department_manager"],
                "process_returns": ["customer_service_rep", "sales_associate"]
            },
            "marketing_operations": {
                "create_campaign": ["marketing_specialist"],
                "analyze_performance": ["marketing_specialist", "business_owner"],
                "manage_promotions": ["marketing_specialist", "department_manager"],
                "customer_segmentation": ["marketing_specialist", "financial_analyst"]
            },
            "financial_operations": {
                "generate_reports": ["financial_analyst"],
                "audit_transactions": ["financial_analyst", "security_auditor"],
                "budget_planning": ["financial_analyst", "business_owner"],
                "compliance_reporting": ["financial_analyst", "security_auditor"]
            }
        }
    
    def get_effective_permissions(self, user_id: str) -> Dict[ResourceType, AccessLevel]:
        """Calculate effective permissions for a user based on all assigned roles"""
        user_roles = self.user_roles.get(user_id, [])
        effective_permissions = {}
        
        for role_name in user_roles:
            role = self.roles.get(role_name)
            if not role:
                continue
                
            # Include inherited roles
            all_roles = self._get_all_inherited_roles(role_name)
            
            for inherited_role_name in all_roles:
                inherited_role = self.roles.get(inherited_role_name)
                if inherited_role:
                    for permission in inherited_role.permissions:
                        current_level = effective_permissions.get(permission.resource, AccessLevel.NONE)
                        # Take the highest access level
                        if permission.access_level.value > current_level.value:
                            effective_permissions[permission.resource] = permission.access_level
        
        return effective_permissions
    
    def _get_all_inherited_roles(self, role_name: str) -> Set[str]:
        """Get all roles inherited by a given role (including the role itself)"""
        all_roles = {role_name}
        role = self.roles.get(role_name)
        
        if role:
            for inherited_role in role.inherits_from:
                all_roles.update(self._get_all_inherited_roles(inherited_role))
        
        return all_roles
    
    def can_perform_workflow_action(self, user_id: str, workflow: str, action: str) -> Tuple[bool, str]:
        """Check if user can perform a specific workflow action"""
        user_roles = self.user_roles.get(user_id, [])
        
        if workflow not in self.workflow_permissions:
            return False, f"Unknown workflow: {workflow}"
        
        if action not in self.workflow_permissions[workflow]:
            return False, f"Unknown action: {action} in workflow: {workflow}"
        
        required_roles = self.workflow_permissions[workflow][action]
        
        # Check if user has any of the required roles (or inherited roles)
        for role_name in user_roles:
            all_roles = self._get_all_inherited_roles(role_name)
            if any(req_role in all_roles for req_role in required_roles):
                return True, f"Access granted via role: {role_name}"
        
        return False, f"User lacks required roles: {required_roles}"
    
    def assign_role(self, user_id: str, role_name: str) -> bool:
        """Assign a role to a user"""
        if role_name not in self.roles:
            return False
        
        role = self.roles[role_name]
        
        # Check max users limit
        if role.max_users:
            current_users = sum(1 for users in self.user_roles.values() if role_name in users)
            if current_users >= role.max_users:
                return False
        
        if role_name not in self.user_roles[user_id]:
            self.user_roles[user_id].append(role_name)
        
        return True
    
    def generate_role_analysis_report(self) -> Dict:
        """Generate comprehensive analysis of the role system"""
        report = {
            "role_summary": {},
            "permission_matrix": {},
            "workflow_coverage": {},
            "compliance_requirements": {},
            "security_analysis": {}
        }
        
        # Role summary
        for role_name, role in self.roles.items():
            report["role_summary"][role_name] = {
                "description": role.description,
                "permission_count": len(role.permissions),
                "inherits_from": role.inherits_from,
                "max_users": role.max_users,
                "requires_approval": role.requires_approval,
                "compliance_requirements": role.compliance_requirements
            }
        
        # Permission matrix
        for role_name in self.roles.keys():
            permissions = self.get_effective_permissions("dummy_user")  # Get all possible permissions
            role_permissions = {}
            
            # Temporarily assign role to get effective permissions
            self.user_roles["dummy_user"] = [role_name]
            effective_perms = self.get_effective_permissions("dummy_user")
            
            for resource in ResourceType:
                role_permissions[resource.value] = effective_perms.get(resource, AccessLevel.NONE).name
            
            report["permission_matrix"][role_name] = role_permissions
            
            # Clean up
            del self.user_roles["dummy_user"]
        
        # Workflow coverage
        for workflow, actions in self.workflow_permissions.items():
            report["workflow_coverage"][workflow] = {}
            for action, roles in actions.items():
                report["workflow_coverage"][workflow][action] = roles
        
        # Compliance requirements
        compliance_roles = {}
        for role_name, role in self.roles.items():
            if role.compliance_requirements:
                compliance_roles[role_name] = role.compliance_requirements
        report["compliance_requirements"] = compliance_roles
        
        # Security analysis
        high_privilege_roles = []
        restricted_roles = []
        
        for role_name, role in self.roles.items():
            # Count admin/owner permissions
            admin_perms = sum(1 for p in role.permissions if p.access_level in [AccessLevel.ADMIN, AccessLevel.OWNER])
            
            if admin_perms > 5:
                high_privilege_roles.append({
                    "role": role_name,
                    "admin_permissions": admin_perms,
                    "total_permissions": len(role.permissions)
                })
            
            if role.max_users and role.max_users <= 5:
                restricted_roles.append({
                    "role": role_name,
                    "max_users": role.max_users,
                    "requires_approval": role.requires_approval
                })
        
        report["security_analysis"] = {
            "high_privilege_roles": high_privilege_roles,
            "restricted_roles": restricted_roles,
            "total_roles": len(self.roles),
            "approval_required_roles": sum(1 for role in self.roles.values() if role.requires_approval)
        }
        
        return report

# Create the RBAC system for our online shoe store
print("🏪 Initializing Online Shoe Store RBAC System...")
shoe_store_rbac = OnlineShoeStoreRBACSystem()
print(f"✅ System initialized with {len(shoe_store_rbac.roles)} roles defined")

🏪 Initializing Online Shoe Store RBAC System...
✅ System initialized with 13 roles defined


In [10]:
# Generate comprehensive role analysis report
print("📊 GENERATING COMPREHENSIVE ROLE ANALYSIS...")
analysis_report = shoe_store_rbac.generate_role_analysis_report()

print("\n" + "="*60)
print("🏪 ONLINE SHOE STORE - ROLE-BASED ACCESS CONTROL ANALYSIS")
print("="*60)

# Role Summary
print(f"\n📋 ROLE SUMMARY")
print("-" * 30)
for role_name, details in analysis_report["role_summary"].items():
    print(f"\n🔑 {role_name.upper().replace('_', ' ')}")
    print(f"   Description: {details['description']}")
    print(f"   Permissions: {details['permission_count']}")
    if details['inherits_from']:
        print(f"   Inherits From: {', '.join(details['inherits_from'])}")
    if details['max_users']:
        print(f"   Max Users: {details['max_users']}")
    print(f"   Requires Approval: {'Yes' if details['requires_approval'] else 'No'}")
    if details['compliance_requirements']:
        print(f"   Compliance: {', '.join(details['compliance_requirements'])}")

# Security Analysis
print(f"\n🔒 SECURITY ANALYSIS")
print("-" * 30)
security = analysis_report["security_analysis"]
print(f"Total Roles Defined: {security['total_roles']}")
print(f"Approval Required: {security['approval_required_roles']} roles")
print(f"High-Privilege Roles: {len(security['high_privilege_roles'])}")
print(f"Restricted Access Roles: {len(security['restricted_roles'])}")

print(f"\n🚨 HIGH-PRIVILEGE ROLES:")
for role_info in security['high_privilege_roles']:
    print(f"   • {role_info['role']}: {role_info['admin_permissions']} admin permissions")

print(f"\n🔐 RESTRICTED ACCESS ROLES:")
for role_info in security['restricted_roles']:
    print(f"   • {role_info['role']}: Max {role_info['max_users']} users, "
          f"Approval: {'Required' if role_info['requires_approval'] else 'Not Required'}")

# Workflow Coverage Analysis
print(f"\n🔄 WORKFLOW COVERAGE ANALYSIS")
print("-" * 30)
for workflow, actions in analysis_report["workflow_coverage"].items():
    print(f"\n📋 {workflow.upper().replace('_', ' ')} WORKFLOW:")
    for action, roles in actions.items():
        print(f"   {action.replace('_', ' ').title()}: {', '.join(roles)}")

# Compliance Requirements
print(f"\n📜 COMPLIANCE REQUIREMENTS")
print("-" * 30)
compliance = analysis_report["compliance_requirements"]
for role, requirements in compliance.items():
    print(f"\n🔖 {role.replace('_', ' ').title()}:")
    for req in requirements:
        print(f"   • {req.replace('_', ' ').title()}")

print(f"\n💡 KEY DESIGN INSIGHTS")
print("-" * 30)

# Calculate role distribution
customer_roles = [r for r in analysis_report["role_summary"].keys() if "customer" in r]
employee_roles = [r for r in analysis_report["role_summary"].keys() if r not in customer_roles and r != "supplier_user"]
supplier_roles = [r for r in analysis_report["role_summary"].keys() if "supplier" in r]

print(f"\n1. ROLE DISTRIBUTION:")
print(f"   • Customer Roles: {len(customer_roles)} ({', '.join(customer_roles)})")
print(f"   • Employee Roles: {len(employee_roles)}")
print(f"   • Supplier Roles: {len(supplier_roles)}")

print(f"\n2. SECURITY CONTROLS:")
approval_roles = [r for r, d in analysis_report["role_summary"].items() if d['requires_approval']]
print(f"   • {len(approval_roles)} roles require management approval")
print(f"   • {len([r for r, d in analysis_report["role_summary"].items() if d['max_users']])} roles have user limits")
print(f"   • {len(compliance)} roles have compliance requirements")

print(f"\n3. INHERITANCE HIERARCHY:")
inheritance_roles = [r for r, d in analysis_report["role_summary"].items() if d['inherits_from']]
print(f"   • {len(inheritance_roles)} roles use inheritance")
print(f"   • Reduces permission duplication and management overhead")
print(f"   • Enables consistent permission escalation paths")

print(f"\n4. WORKFLOW INTEGRATION:")
total_workflow_actions = sum(len(actions) for actions in analysis_report["workflow_coverage"].values())
print(f"   • {len(analysis_report['workflow_coverage'])} business workflows defined")
print(f"   • {total_workflow_actions} total workflow actions")
print(f"   • All critical business processes covered")

print(f"\n5. SEPARATION OF DUTIES:")
print(f"   • Financial operations restricted to certified analysts")
print(f"   • System administration separated from business operations")
print(f"   • Customer service has limited financial access")
print(f"   • Audit functions independent from operational roles")

# Demonstrate user assignment and permission checking
print(f"\n🧪 DEMONSTRATION: USER ROLE ASSIGNMENTS & PERMISSION TESTING")
print("-" * 30)

# Test users
test_users = {
    "alice_customer": ["registered_customer"],
    "bob_premium": ["premium_customer"],
    "charlie_sales": ["sales_associate"],
    "diana_inventory": ["inventory_manager"],
    "eve_marketing": ["marketing_specialist"],
    "frank_finance": ["financial_analyst"],
    "grace_manager": ["department_manager"],
    "henry_admin": ["system_administrator"]
}

# Assign roles to test users
print(f"\n👥 ASSIGNING ROLES TO TEST USERS:")
for user_id, roles in test_users.items():
    for role in roles:
        success = shoe_store_rbac.assign_role(user_id, role)
        status = "✅" if success else "❌"
        print(f"   {status} {user_id}: {role}")

# Test workflow permissions
print(f"\n🔍 WORKFLOW PERMISSION TESTING:")

test_scenarios = [
    ("alice_customer", "order_processing", "create_order"),
    ("charlie_sales", "order_processing", "process_payment"),
    ("diana_inventory", "inventory_management", "add_product"),
    ("eve_marketing", "marketing_operations", "create_campaign"),
    ("frank_finance", "financial_operations", "generate_reports"),
    ("alice_customer", "inventory_management", "add_product"),  # Should fail
    ("charlie_sales", "financial_operations", "generate_reports"),  # Should fail
]

for user_id, workflow, action in test_scenarios:
    can_perform, reason = shoe_store_rbac.can_perform_workflow_action(user_id, workflow, action)
    status = "✅ ALLOWED" if can_perform else "🚫 DENIED"
    print(f"   {status}: {user_id} → {workflow}/{action}")
    print(f"            Reason: {reason}")

# Display effective permissions for key users
print(f"\n🎯 EFFECTIVE PERMISSIONS ANALYSIS:")
print("-" * 30)

sample_users = ["alice_customer", "charlie_sales", "diana_inventory", "frank_finance"]
for user_id in sample_users:
    print(f"\n👤 {user_id.upper()}:")
    permissions = shoe_store_rbac.get_effective_permissions(user_id)
    
    # Group permissions by access level
    by_level = defaultdict(list)
    for resource, level in permissions.items():
        by_level[level.name].append(resource.value)
    
    for level in ["OWNER", "ADMIN", "WRITE", "READ"]:
        if level in by_level:
            print(f"   {level}: {len(by_level[level])} resources")
            # Show a few examples
            examples = by_level[level][:3]
            if examples:
                print(f"     Examples: {', '.join(examples)}")

print(f"\n✅ Exercise 2 Analysis Complete!")
print(f"   🏪 Designed comprehensive RBAC system for online shoe store")
print(f"   🔑 Created {len(shoe_store_rbac.roles)} specialized roles")
print(f"   🔄 Defined {len(analysis_report['workflow_coverage'])} business workflows")
print(f"   🔒 Implemented security controls and compliance requirements")
print(f"   ✅ Validated system with real-world testing scenarios")

📊 GENERATING COMPREHENSIVE ROLE ANALYSIS...

🏪 ONLINE SHOE STORE - ROLE-BASED ACCESS CONTROL ANALYSIS

📋 ROLE SUMMARY
------------------------------

🔑 GUEST CUSTOMER
   Description: Anonymous visitors who can browse products
   Permissions: 2
   Requires Approval: No

🔑 REGISTERED CUSTOMER
   Description: Registered customers with accounts
   Permissions: 5
   Inherits From: guest_customer
   Requires Approval: No

🔑 PREMIUM CUSTOMER
   Description: Premium customers with additional benefits
   Permissions: 1
   Inherits From: registered_customer
   Requires Approval: No

🔑 CUSTOMER SERVICE REP
   Description: Handle customer inquiries and basic order management
   Permissions: 5
   Requires Approval: Yes
   Compliance: customer_data_training, privacy_certification

🔑 SALES ASSOCIATE
   Description: Process orders and handle sales operations
   Permissions: 2
   Inherits From: customer_service_rep
   Requires Approval: Yes

🔑 INVENTORY MANAGER
   Description: Manage stock levels and s

## Chapter 5 Summary: User Management Fundamentals

### Key Learning Outcomes

Through this interactive chapter, you have gained hands-on experience with:

#### 🔄 **User Lifecycle Management**
- **Automated User Provisioning**: Learned to implement secure user creation with proper approval workflows
- **Multi-Stage Approval Process**: Understood the importance of review and validation before account activation
- **Deactivation and Cleanup**: Experienced proper account decommissioning with data retention policies
- **Audit Trail Creation**: Saw how every lifecycle event should be logged for compliance and security

#### 🔑 **Role-Based Access Control (RBAC)**
- **Hierarchical Role Design**: Created complex role structures with inheritance and specialization
- **Permission Granularity**: Implemented fine-grained access controls with conditions and restrictions
- **Workflow Integration**: Connected roles to real business processes and operational workflows
- **Separation of Duties**: Designed systems that prevent conflicts of interest and enforce compliance

#### 🕵️ **User Behavior Monitoring**
- **Baseline Establishment**: Built systems that learn normal user behavior patterns
- **Anomaly Detection**: Implemented algorithms to identify suspicious activities and security threats
- **Risk Scoring**: Created comprehensive scoring systems for evaluating user actions
- **Automated Response**: Designed systems that can take appropriate action when threats are detected

#### 🛡️ **Security Best Practices**
- **Third-Party Service Analysis**: Developed tools for evaluating external service security requirements
- **Compliance Integration**: Understood how regulatory requirements impact user management decisions
- **Financial Impact Assessment**: Learned to balance security needs with cost considerations
- **Real-World Application**: Applied concepts to practical business scenarios like online retail

### Critical Security Principles Reinforced

1. **Defense in Depth**: Multiple layers of security controls working together
2. **Least Privilege**: Users get only the minimum access needed for their roles
3. **Continuous Monitoring**: Security is an ongoing process, not a one-time setup
4. **Audit and Compliance**: Every action must be traceable and justifiable
5. **Risk-Based Decision Making**: Security choices should be informed by actual risk assessment

### Practical Takeaways

- **User management is not just IT administration** - it's a critical business function that affects security, compliance, and operational efficiency
- **Automation is essential** - manual processes don't scale and are error-prone
- **Monitoring must be proactive** - waiting for incidents is too late
- **Business context matters** - security controls must align with actual business needs and workflows
- **Regular review is mandatory** - user access and behavior patterns change over time

### Next Steps in Your Security Journey

1. **Practice Implementation**: Try building these systems in your own environment
2. **Study Real Frameworks**: Explore tools like Active Directory, LDAP, OAuth, and modern IAM solutions
3. **Understand Compliance**: Learn about regulations that affect your industry (SOX, HIPAA, GDPR, etc.)
4. **Monitor Industry Trends**: Stay current with emerging threats and security technologies
5. **Build Security Mindset**: Think about user management security in every system you design or use

Remember: **Effective user management is the foundation of organizational security.** Master these fundamentals, and you'll be well-equipped to tackle more advanced security challenges.

---

*🎓 Congratulations on completing Chapter 5: User Management Fundamentals! You now have practical experience with the core concepts that secure organizations rely on every day.*

# Chapter 5: User Management Fundamentals
## Interactive Learning & Practical Demonstrations

**TCPRG4005 - Secure Programming**

This notebook provides hands-on experience with user management security concepts through practical Python implementations and real-world scenarios.

### Learning Objectives
- Understand secure user lifecycle management
- Implement role-based access control systems
- Design separation of duties controls
- Build comprehensive audit logging
- Analyze user management vulnerabilities
- Practice secure user management architectures

---
*Run each cell in sequence to build your understanding of user management security.*

## Section 1: Interactive User Management Demonstrations

This section provides hands-on demonstrations of core user management concepts with working Python code that you can modify and experiment with.

### 1.1 Secure User Lifecycle Management
Understanding the complete user lifecycle from creation to deactivation with proper security controls.