Exercise 1: Building a Secure User Authentication System

In [2]:
import hashlib
import logging
import sys
import datetime

# --- Configuration and Setup ---

# Set up logging for authentication attempts (Required: Implement logging for all authentication attempts )
# Configure logger to write to a file and the console
LOG_FILE = "auth_activity.log"
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename=LOG_FILE,
    filemode='w'
)
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.info("Authentication System Logger Initialized.")


# --- Core Class: User ---
class User:
    """
    Models a secure user account with authentication and access control logic.
    Ensures sensitive data remains protected through proper encapsulation[cite: 141].
    """

    # Define max login attempts before locking (Access Control Logic )
    MAX_ATTEMPTS = 3

    # Constructor with private attributes (Required: All sensitive attributes are private [cite: 147, 152])
    def __init__(self, username, password, privilege_level="standard"):
        # Private Attributes (convention: prefix with _)
        self._username = self._validate_username(username)
        self._hashed_password = self._hash_password(password) # All password fields must be private [cite: 152]
        self._privilege_level = self._validate_privilege(privilege_level)
        self._login_attempts = 0
        self._account_status = "active" # active/locked [cite: 144]

        # Log account creation
        self.log_activity(f"Account created. Status: {self._account_status}")

    # --- Private Utility Methods ---

    def _hash_password(self, password):
        """Hashes the password using SHA-256 for secure storage."""
        # Note: In a real system, you would use bcrypt or Argon2
        return hashlib.sha256(password.encode()).hexdigest()

    def _validate_username(self, username):
        """Input validation for username (Required: Include input validation )."""
        if not (3 <= len(username) <= 20):
            raise ValueError("Username must be between 3 and 20 characters.")
        return username

    def _validate_privilege(self, level):
        """Input validation for privilege level."""
        valid_levels = {'admin', 'standard', 'guest'}
        if level not in valid_levels:
            raise ValueError(f"Invalid privilege level. Must be one of {valid_levels}.")
        return level

    # --- Public Security Methods ---

    def authenticate(self, provided_password):
        """Authenticates the user by comparing the provided password hash."""

        # Check if account is locked
        if self._account_status == "locked":
            self.log_activity("Authentication FAILED: Account is locked.")
            return False

        # Hash the provided password to compare with the stored hash
        provided_hash = self._hash_password(provided_password)

        if provided_hash == self._hashed_password:
            # Successful login
            self._login_attempts = 0 # reset_login_attempts() [cite: 147]
            self.log_activity("Authentication SUCCESS.")
            return True
        else:
            # Failed login
            self._login_attempts += 1
            self.log_activity(f"Authentication FAILED. Attempt {self._login_attempts}/{self.MAX_ATTEMPTS}")

            # Access Control Logic: Lock account after 3 failed login attempts
            if self._login_attempts >= self.MAX_ATTEMPTS:
                self.lock_account()

            return False

    def check_privileges(self):
        """Returns the user's current privilege level."""
        return self._privilege_level

    def lock_account(self):
        """Locks the user's account and updates the status."""
        if self._account_status != "locked":
            self._account_status = "locked"
            self.log_activity("Account status changed to LOCKED.")

    def unlock_account(self, admin_user=None):
        """
        Unlocks the user's account, only allowing this action if an admin is provided
        (Prevention of direct access to sensitive attributes / Unlocking without proper validation [cite: 163, 172]).
        """
        if admin_user and admin_user.check_privileges() == 'admin':
            if self._account_status == "locked":
                self._account_status = "active"
                self._login_attempts = 0 # Reset attempts upon unlocking
                self.log_activity(f"Account successfully UNLOCKED by Admin: {admin_user.get_username()}")
                return True
            else:
                self.log_activity("Attempted to unlock an active account.")
                return True # Technically successful, as it is already unlocked
        else:
            self.log_activity("Unlock FAILED: Requires Admin privileges.")
            return False

    def log_activity(self, message):
        """Logs user-specific activities."""
        logger.info(f"[USER:{self._username}] {message}")

    # --- Getters (Safely display user information) ---

    def get_username(self):
        return self._username

    def get_account_status(self):
        return self._account_status

    def get_login_attempts(self):
        return self._login_attempts

    # --- Setters (With Validation/Authorisation) ---

    def set_privilege_level(self, new_level, admin_user):
        """
        Allows privilege escalation only through authorised methods.
        """
        if admin_user and admin_user.check_privileges() == 'admin':
            try:
                self._privilege_level = self._validate_privilege(new_level)
                self.log_activity(f"Privilege level set to '{new_level}' by Admin: {admin_user.get_username()}")
                return True
            except ValueError as e:
                self.log_activity(f"Privilege change FAILED: {e}")
                return False
        else:
            # Prevention of privilege escalation without authorisation [cite: 168, 169]
            self.log_activity("Privilege change FAILED: Not authorised (Admin required).")
            return False

    # --- Safe Display Method ---
    def display_safe_info(self):
        """Safely displays user information without exposing sensitive data."""
        return (f"User: {self._username} | Privilege: {self._privilege_level} | "
                f"Status: {self._account_status} | Attempts: {self._login_attempts}/{self.MAX_ATTEMPTS}")

    # Representation method for debugging (ensuring sensitive data is protected [cite: 170])
    def __repr__(self):
        return f"<User username='{self._username}' status='{self._account_status}' privilege='{self._privilege_level}'>"


# --- Demonstration of Exercise 1 ---
print("\n" + "="*70)
print("              Exercise 1: User Authentication System Demo")
print("="*70)

# 1. Create User Objects
admin_user = User("sysadmin", "SecurePass!123", "admin")
std_user = User("standard_user", "password123", "standard")
guest_user = User("guest", "guestpass", "guest")

print("\n--- Initial State ---")
print(admin_user.display_safe_info())
print(std_user.display_safe_info())

# 2. Authentication Scenarios
print("\n--- Authentication Attempts ---")

# Successful authentication
if std_user.authenticate("password123"):
    print(f"SUCCESS: {std_user.get_username()} logged in.")
else:
    print(f"FAILURE: {std_user.get_username()} login failed.")

# 3. Account Locking Logic (3 failed attempts)
print("\n--- Account Locking Scenario ---")
# Fail 1
std_user.authenticate("wrongpass")
# Fail 2
std_user.authenticate("wrongpass")
# Fail 3 (Account should lock here)
std_user.authenticate("wrongpass")

print(f"Status after 3 failures: {std_user.display_safe_info()}")

# Try to log in while locked
std_user.authenticate("password123")

# 4. Privilege Escalation and Unlocking (Access Control)
print("\n--- Privilege/Access Control ---")

# Standard user attempts to elevate privilege (Prevention of privilege escalation [cite: 168, 169])
print(f"Standard user attempts to escalate their own privileges: {std_user.set_privilege_level('admin', std_user)}")

# Admin user attempts to elevate standard user's privilege
print(f"Admin attempts to elevate standard_user's privileges: {std_user.set_privilege_level('admin', admin_user)}")
print(f"New status: {std_user.display_safe_info()}")

# Admin unlocks the account (Unlocking accounts without proper validation [cite: 172])
admin_user.log_activity("Attempting to unlock locked account.")
std_user.unlock_account(admin_user)

# Final check
print(f"Final Status: {std_user.display_safe_info()}")
if std_user.authenticate("password123"):
    print(f"SUCCESS: {std_user.get_username()} logged in after unlock.")


              Exercise 1: User Authentication System Demo

--- Initial State ---
User: sysadmin | Privilege: admin | Status: active | Attempts: 0/3
User: standard_user | Privilege: standard | Status: active | Attempts: 0/3

--- Authentication Attempts ---
SUCCESS: standard_user logged in.

--- Account Locking Scenario ---
Status after 3 failures: User: standard_user | Privilege: standard | Status: locked | Attempts: 3/3

--- Privilege/Access Control ---
Standard user attempts to escalate their own privileges: False
Admin attempts to elevate standard_user's privileges: True
New status: User: standard_user | Privilege: admin | Status: locked | Attempts: 3/3
Final Status: User: standard_user | Privilege: admin | Status: active | Attempts: 0/3
SUCCESS: standard_user logged in after unlock.


Exercise 2: IoT Device Management System

In [5]:
import datetime
import logging
from typing import List, Optional
# Reuse logger setup from Exercise 1, ensuring the output logs all activity.
# If running this notebook block separately, ensure logging is configured.

# Setting up a dedicated logger for Device activities
DEVICE_LOG_FILE = "device_activity.log"
device_logger = logging.getLogger('DeviceManager')
device_logger.setLevel(logging.INFO)

# File Handler for device activities
fh = logging.FileHandler(DEVICE_LOG_FILE, mode='w')
fh.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
device_logger.addHandler(fh)
device_logger.propagate = False # Prevent double logging if root logger is also configured


# --- Core Class 1: Device ---
class Device:
    """Models an IoT device with security and compliance attributes."""

    COMPLIANCE_DAYS_LIMIT = 30 # Devices become non-compliant if not scanned within 30 days

    def __init__(self, device_id: str, device_type: str, firmware_version: str, owner: str):
        # Private attributes for encapsulation and state management [cite: 202]
        self._device_id = device_id
        self._device_type = device_type
        self._firmware_version = firmware_version
        self._owner = owner
        self._compliance_status = True # Initial status
        self._is_active = True
        self._last_security_scan: Optional[datetime.datetime] = datetime.datetime.now()

        device_logger.info(f"Device {self._device_id} created by {self._owner} (Type: {self._device_type}).")

    # --- Getters ---
    def get_info(self):
        """Returns safe, non-sensitive device information."""
        return {
            'id': self._device_id,
            'type': self._device_type,
            'firmware': self._firmware_version,
            'owner': self._owner,
            'compliant': self.check_compliance(),
            'active': self._is_active
        }

    # --- Security and State Management Methods ---

    def run_security_scan(self, initiating_user: 'User'):
        """Updates the last security scan timestamp."""
        if not self._is_active:
            device_logger.warning(f"SCAN BLOCKED: Device {self._device_id} is quarantined.")
            return False

        self._last_security_scan = datetime.datetime.now()
        self._compliance_status = True
        device_logger.info(f"Device {self._device_id}: Security scan initiated by {initiating_user.get_username()}. Compliance reset to True.")
        return True

    def update_firmware(self, new_version: str, initiating_user: 'User'):
        """Updates the firmware version."""
        if not self._is_active:
            device_logger.warning(f"FIRMWARE UPDATE BLOCKED: Device {self._device_id} is quarantined.")
            return False

        self._firmware_version = new_version
        self.check_compliance(force_recheck=True) # Check compliance after update
        device_logger.info(f"Device {self._device_id}: Firmware updated to {new_version} by {initiating_user.get_username()}.")
        return True

    def check_compliance(self, force_recheck=False):
        """
        Checks if the device is compliant based on the last scan time and active status.
        (Devices must pass compliance checks before allowing access )
        """
        if not self._is_active:
            return False # Quarantined devices are non-compliant

        if self._last_security_scan is None:
            # If never scanned, treat as non-compliant
            self._compliance_status = False
        else:
            time_difference = datetime.datetime.now() - self._last_security_scan

            if time_difference.days > self.COMPLIANCE_DAYS_LIMIT:
                self._compliance_status = False
                if force_recheck:
                    device_logger.warning(f"Device {self._device_id} set to NON-COMPLIANT (Last scan {time_difference.days} days ago).")
            # If compliant, status remains True

        return self._compliance_status

    def quarantine(self, admin_user: 'User'):
        """Quarantines the device (Only admin users can override compliance checks )."""
        if admin_user.check_privileges() == 'admin':
            self._is_active = False
            self._compliance_status = False
            device_logger.critical(f"Device {self._device_id} QUARANTINED by Admin {admin_user.get_username()}.")
            return True
        else:
            device_logger.error(f"Quarantine failed: {admin_user.get_username()} lacks admin privilege.")
            return False


# --- Core Class 2: DeviceManager ---
class DeviceManager:
    """Maintains a list of devices and controls access."""

    def __init__(self):
        # Maintains a list of devices [cite: 193]
        self._devices: List[Device] = []

    def add_device(self, device: Device):
        """Adds a device to the manager's list."""
        self._devices.append(device)
        device_logger.info(f"Manager added device {device._device_id}.")

    def remove_device(self, device_id: str):
        """Removes a device from the manager's list."""
        self._devices[:] = [d for d in self._devices if d._device_id != device_id]
        device_logger.info(f"Manager removed device {device_id}.")

    def get_device(self, device_id: str) -> Optional[Device]:
        """Retrieves a device object by ID."""
        for device in self._devices:
            if device._device_id == device_id:
                return device
        return None

    def authorise_access(self, device_id: str, requesting_user: 'User', override_check=False):
        """
        Builds an authorise_access() method that checks user permission, ownership, and compliance.
        (All device interactions must be logged with timestamps )
        """
        device = self.get_device(device_id)
        if not device:
            device_logger.error(f"ACCESS DENIED: Device {device_id} not found.")
            return False

        user_privilege = requesting_user.check_privileges()

        # Admin override logic
        is_admin_override = (override_check and user_privilege == 'admin')

        # 1. Check Ownership (Standard user: Can access owned compliant devices [cite: 209])
        is_owner = (device._owner == requesting_user.get_username())

        # 2. Check Compliance (Non-compliant devices denied access [cite: 216])
        is_compliant = device.check_compliance()

        # Admin user: Can access all devices [cite: 212]
        if user_privilege == 'admin':
            device_logger.info(f"ACCESS GRANTED (Admin): {requesting_user.get_username()} accessed {device_id}.")
            return True

        if is_owner and is_compliant:
            device_logger.info(f"ACCESS GRANTED: {requesting_user.get_username()} accessed owned compliant device {device_id}.")
            return True

        if is_admin_override and user_privilege == 'admin':
             device_logger.warning(f"ACCESS GRANTED (Admin Override): {requesting_user.get_username()} forced access to {device_id} (Compliance: {is_compliant}).")
             return True

        # Log and deny access
        if not is_owner:
            device_logger.warning(f"ACCESS DENIED: {requesting_user.get_username()} tried to access non-owned device {device_id}.") # Cannot access other users' devices [cite: 209]
        elif not is_compliant:
            device_logger.warning(f"ACCESS DENIED: Device {device_id} is not compliant.")

        return False

    def generate_security_report(self):
        """Generates a list of all non-compliant devices."""
        non_compliant_devices = []
        for device in self._devices:
            if not device.check_compliance():
                info = device.get_info()
                info['reason'] = "Quarantined" if not info['active'] else "Out of compliance period"
                non_compliant_devices.append(info)

        device_logger.info(f"Security Report generated: {len(non_compliant_devices)} non-compliant devices found.")
        return non_compliant_devices


# --- Demonstration of Exercise 2 ---
print("\n" + "="*70)
print("              Exercise 2: IoT Device Management Demo")
print("="*70)

# 1. Setup (Requires User class from Exercise 1)
try:
    # Use existing users from Exercise 1
    admin_user
    std_user

except NameError:
    # If Exercise 1 block was not run, create them here
    admin_user = User("sysadmin", "SecurePass!123", "admin")
    std_user = User("standard_user", "password123", "standard")

manager = DeviceManager()

# 2. Add Devices
device_1 = Device("D001", "Sensor", "1.0.0", "standard_user")
device_2 = Device("D002", "Camera", "1.5.2", "admin")
device_3 = Device("D003", "Gateway", "2.1.0", "standard_user")

manager.add_device(device_1)
manager.add_device(device_2)
manager.add_device(device_3)

print("\n--- Compliance Check (Simulating time lapse) ---")
# Manually simulate device_3 being out of compliance
# NOTE: To test this in reality, you would modify the device._last_security_scan attribute
# For the demo, we check current status:
print(f"Device 1 compliant: {device_1.check_compliance()}")
print(f"Device 2 compliant: {device_2.check_compliance()}")


# 3. User-Device Interactions (Access Control)
print("\n--- Access Control & Actions ---")

# Standard user access owned, compliant device (Success [cite: 209])
d1_access = manager.authorise_access("D001", std_user)
if d1_access:
    device_1.run_security_scan(std_user)

# Standard user access non-owned device (Failure [cite: 209])
d2_access = manager.authorise_access("D002", std_user)

# Admin access non-owned device (Success [cite: 212])
d1_admin_access = manager.authorise_access("D001", admin_user)


# 4. Quarantine and Admin Override
print("\n--- Quarantine Scenario ---")
# Admin quarantines device 3 (Can quarantine devices [cite: 213])
device_3.quarantine(admin_user)

# Standard user attempts to access quarantined device (Failure [cite: 216])
manager.authorise_access("D003", std_user)

# Admin attempts to access quarantined device with override (Admin user: Can override compliance checks [cite: 214])
# Although quarantine is an active denial, admin access is typically granted to perform maintenance
manager.authorise_access("D003", admin_user, override_check=True)


# 5. Security Report
print("\n--- Security Report ---")
report = manager.generate_security_report()
print(f"Non-compliant devices found: {len(report)}")
for item in report:
    print(f"  ID: {item['id']}, Type: {item['type']}, Reason: {item['reason']}")

print("\n" + "="*70)
print("        All Exercises Complete")
print(f"Check the following files for audit trails:")
print(f"- Authentication Log: {LOG_FILE}")
print(f"- Device Activity Log: {DEVICE_LOG_FILE}")
print("="*70)


              Exercise 2: IoT Device Management Demo

--- Compliance Check (Simulating time lapse) ---
Device 1 compliant: True
Device 2 compliant: True

--- Access Control & Actions ---

--- Quarantine Scenario ---

--- Security Report ---
Non-compliant devices found: 1
  ID: D003, Type: Gateway, Reason: Quarantined

        All Exercises Complete
Check the following files for audit trails:
- Authentication Log: auth_activity.log
- Device Activity Log: device_activity.log
