
##1.1 Lab 1: Singleton Pattern Implementation
Objective: Understand and implement the Singleton design pattern with thread safety.
Problem Statement: Create a thread-sa

In [None]:
import threading
from typing import Dict, Any


class ConfigurationManager:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super(ConfigurationManager, cls).__new__(cls)
                cls._instance._initialized = False  # Flag to prevent re-init
            return cls._instance

    def __init__(self):
        # Only initialize once
        if not hasattr(self, '_initialized') or not self._initialized:
            with type(self)._lock:
                if not hasattr(self, '_initialized') or not self._initialized:
                    self._initialize()
                    self._initialized = True

    def _initialize(self):
        self._config: Dict[str, Any] = {
            "database_url": "localhost:5432",
            "api_key": "default_key",
            "max_connections": 10,
            "debug_mode": True
        }

    def get(self, key: str) -> Any:
        return self._config.get(key)

    def set(self, key: str, value: Any):
        with self._lock:
            self._config[key] = value

    def display_config(self):
        print("Current Configuration:")
        for key, value in self._config.items():
            print(f"{key}: {value}")


# Test the Singleton Pattern
def test_singleton(thread_id):
    config = ConfigurationManager()
    config.set(f"thread_{thread_id}_access", True)
    print(f"Thread {thread_id}: Instance ID - {id(config)}")
    config.display_config()
    print("-" * 40)


# Multi-threading test
if __name__ == "__main__":
    threads = []
    for i in range(3):
        thread = threading.Thread(target=test_singleton, args=(i,))
        threads.append(thread)
        thread.start()

    # Wait for all threads to complete
    for thread in threads:
        thread.join()

    print("All threads completed. Final config from main:")
    ConfigurationManager().display_config()

Thread 0: Instance ID - 139009787526272
Current Configuration:
database_url: localhost:5432
api_key: default_key
max_connections: 10
debug_mode: True
thread_0_access: True
----------------------------------------
Thread 1: Instance ID - 139009787526272
Current Configuration:
database_url: localhost:5432
api_key: default_key
max_connections: 10
debug_mode: True
thread_0_access: True
thread_1_access: True
----------------------------------------
Thread 2: Instance ID - 139009787526272
Current Configuration:
database_url: localhost:5432
api_key: default_key
max_connections: 10
debug_mode: True
thread_0_access: True
thread_1_access: True
thread_2_access: True
----------------------------------------
All threads completed. Final config from main:
Current Configuration:
database_url: localhost:5432
api_key: default_key
max_connections: 10
debug_mode: True
thread_0_access: True
thread_1_access: True
thread_2_access: True


#Lab 2: Factory Method Pattern
Objective: Implement Factory Method pattern for creating different types of database connections.
Problem Statement: Create a factory that can generate MySQL, PostgreSQL, and SQLite database
connections based on configuration.


In [None]:
!pip install mysql-connector-python psycopg2-binary

Collecting mysql-connector-python
  Downloading mysql_connector_python-9.5.0-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (7.5 kB)
Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (4.9 kB)
Downloading mysql_connector_python-9.5.0-cp312-cp312-manylinux_2_28_x86_64.whl (34.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m34.1/34.1 MB[0m [31m62.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading psycopg2_binary-2.9.11-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (4.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m118.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: psycopg2-binary, mysql-connector-python
Successfully installed mysql-connector-python-9.5.0 psycopg2-binary-2.9.11


In [None]:
from abc import ABC, abstractmethod
import sqlite3
import mysql.connector
import psycopg2
from typing import Optional


class DatabaseConnection(ABC):
    @abstractmethod
    def connect(self):
        """Establish connection to the database."""
        pass

    @abstractmethod
    def execute_query(self, query: str):
        """Execute a query and return results."""
        pass

    @abstractmethod
    def close(self):
        """Close the database connection."""
        pass


class MySQLConnection(DatabaseConnection):
    def __init__(self, host: str, user: str, password: str, database: str):
        self.host = host
        self.user = user
        self.password = password
        self.database = database
        self.connection = None

    def connect(self):
        if self.connection is None:
            self.connection = mysql.connector.connect(
                host=self.host,
                user=self.user,
                password=self.password,
                database=self.database
            )
        return self.connection

    def execute_query(self, query: str):
        if self.connection is None:
            self.connect()
        cursor = self.connection.cursor()
        cursor.execute(query)
        return cursor.fetchall()

    def close(self):
        if self.connection and self.connection.is_connected():
            self.connection.close()
            self.connection = None


class PostgreSQLConnection(DatabaseConnection):
    def __init__(self, host: str, user: str, password: str, database: str):
        self.host = host
        self.user = user
        self.password = password
        self.database = database
        self.connection = None

    def connect(self):
        if self.connection is None:
            self.connection = psycopg2.connect(
                host=self.host,
                user=self.user,
                password=self.password,
                database=self.database
            )
        return self.connection

    def execute_query(self, query: str):
        if self.connection is None:
            self.connect()
        cursor = self.connection.cursor()
        cursor.execute(query)
        return cursor.fetchall()

    def close(self):
        if self.connection:
            self.connection.close()
            self.connection = None


class SQLiteConnection(DatabaseConnection):
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.connection = None

    def connect(self):
        if self.connection is None:
            self.connection = sqlite3.connect(self.db_path)
        return self.connection

    def execute_query(self, query: str):
        if self.connection is None:
            self.connect()
        cursor = self.connection.cursor()
        cursor.execute(query)
        return cursor.fetchall()

    def close(self):
        if self.connection:
            self.connection.close()
            self.connection = None


class DatabaseFactory:
    @staticmethod
    def create_connection(db_type: str, **kwargs) -> DatabaseConnection:
        db_type = db_type.lower().strip()

        if db_type == "mysql":
            return MySQLConnection(
                host=kwargs.get('host', 'localhost'),
                user=kwargs.get('user', 'root'),
                password=kwargs.get('password', ''),
                database=kwargs.get('database', 'test')
            )
        elif db_type == "postgresql":
            return PostgreSQLConnection(
                host=kwargs.get('host', 'localhost'),
                user=kwargs.get('user', 'postgres'),
                password=kwargs.get('password', ''),
                database=kwargs.get('database', 'test')
            )
        elif db_type == "sqlite":
            return SQLiteConnection(
                db_path=kwargs.get('db_path', ':memory:')
            )
        else:
            raise ValueError(f"Unsupported database type: {db_type}")


# Usage example
if __name__ == "__main__":
    # Create MySQL connection
    mysql_db = DatabaseFactory.create_connection(
        "mysql",
        host="localhost",
        user="root",
        password="password",
        database="test_db"
    )

    # Create SQLite in-memory connection
    sqlite_db = DatabaseFactory.create_connection("sqlite", db_path=":memory:")

    # Print connection types
    print(f"MySQL Connection: {type(mysql_db).__name__}")
    print(f"SQLite Connection: {type(sqlite_db).__name__}")

    # Example usage (uncomment to test queries)
    """
    mysql_db.connect()
    # result = mysql_db.execute_query("SELECT VERSION()")
    # print("MySQL Version:", result)

    sqlite_db.connect()
    sqlite_db.execute_query("CREATE TABLE IF NOT EXISTS test (id INTEGER)")
    sqlite_db.execute_query("INSERT INTO test (id) VALUES (1)")
    result = sqlite_db.execute_query("SELECT * FROM test")
    print("SQLite Result:", result)

    mysql_db.close()
    sqlite_db.close()
    """

MySQL Connection: MySQLConnection
SQLite Connection: SQLiteConnection


#Lab 3: Observer Pattern
Objective: Implement Observer pattern for event notification system.
Problem Statement: Create a stock market system where multiple displays observe price changes.


In [None]:
from abc import ABC, abstractmethod
from typing import List, Dict


# === Observer Interface ===
class Observer(ABC):
    @abstractmethod
    def update(self, stock_name: str, price: float):
        pass


# === Subject Interface ===
class Subject(ABC):
    @abstractmethod
    def register_observer(self, observer: Observer):
        pass

    @abstractmethod
    def remove_observer(self, observer: Observer):
        pass

    @abstractmethod
    def notify_observers(self):
        pass


# === Concrete Subject: StockMarket ===
class StockMarket(Subject):
    def __init__(self):
        self._observers: List[Observer] = []
        self._stock_prices: Dict[str, float] = {}

    def register_observer(self, observer: Observer):
        if observer not in self._observers:
            self._observers.append(observer)

    def remove_observer(self, observer: Observer):
        if observer in self._observers:
            self._observers.remove(observer)

    def notify_observers(self):
        for observer in self._observers:
            for stock, price in self._stock_prices.items():
                observer.update(stock, price)

    def set_stock_price(self, stock_name: str, price: float):
        was_updated = stock_name not in self._stock_prices or self._stock_prices[stock_name] != price
        self._stock_prices[stock_name] = price
        if was_updated:
            self.notify_observers()  # Only notify if price changed

    def get_stock_price(self, stock_name: str) -> float:
        return self._stock_prices.get(stock_name, 0.0)


# === Concrete Observer 1: Desktop Display ===
class StockDisplay(Observer):
    def __init__(self, name: str):
        self.name = name.strip()
        self.displayed_stocks: Dict[str, float] = {}

    def update(self, stock_name: str, price: float):
        self.displayed_stocks[stock_name] = price
        self.display()

    def display(self):
        print(f"\n{self.name} Display:")
        if not self.displayed_stocks:
            print("  No stocks to display.")
        else:
            for stock, price in self.displayed_stocks.items():
                print(f"  {stock}: ${price:.2f}")


# === Concrete Observer 2: Mobile App ===
class MobileApp(Observer):
    def __init__(self, app_name: str):
        self.app_name = app_name.strip()
        self.notifications: List[str] = []

    def update(self, stock_name: str, price: float):
        notification = f"Stock {stock_name} updated to ${price:.2f}"
        self.notifications.append(notification)
        print(f"[{self.app_name}] {notification}")


# === Test the Observer Pattern ===
if __name__ == "__main__":
    # Create stock market
    market = StockMarket()

    # Create observers
    desktop_display = StockDisplay("Desktop")
    mobile_app = MobileApp("StockTracker Mobile")

    # Register observers
    market.register_observer(desktop_display)
    market.register_observer(mobile_app)

    # Update stock prices
    print("=== Stock Market Updates ===")
    market.set_stock_price("AAPL", 150.25)
    market.set_stock_price("GOOGL", 2750.80)
    market.set_stock_price("TSLA", 850.75)

    # Remove one observer and update again
    market.remove_observer(mobile_app)
    print("\n=== After removing mobile app ===")
    market.set_stock_price("MSFT", 305.60)

    # Optional: Show final desktop state
    print("\n=== Final Desktop View ===")
    desktop_display.display()

=== Stock Market Updates ===

Desktop Display:
  AAPL: $150.25
[StockTracker Mobile] Stock AAPL updated to $150.25

Desktop Display:
  AAPL: $150.25

Desktop Display:
  AAPL: $150.25
  GOOGL: $2750.80
[StockTracker Mobile] Stock AAPL updated to $150.25
[StockTracker Mobile] Stock GOOGL updated to $2750.80

Desktop Display:
  AAPL: $150.25
  GOOGL: $2750.80

Desktop Display:
  AAPL: $150.25
  GOOGL: $2750.80

Desktop Display:
  AAPL: $150.25
  GOOGL: $2750.80
  TSLA: $850.75
[StockTracker Mobile] Stock AAPL updated to $150.25
[StockTracker Mobile] Stock GOOGL updated to $2750.80
[StockTracker Mobile] Stock TSLA updated to $850.75

=== After removing mobile app ===

Desktop Display:
  AAPL: $150.25
  GOOGL: $2750.80
  TSLA: $850.75

Desktop Display:
  AAPL: $150.25
  GOOGL: $2750.80
  TSLA: $850.75

Desktop Display:
  AAPL: $150.25
  GOOGL: $2750.80
  TSLA: $850.75

Desktop Display:
  AAPL: $150.25
  GOOGL: $2750.80
  TSLA: $850.75
  MSFT: $305.60

=== Final Desktop View ===

Desktop Dis

#Lab 4: MVC Architecture
Objective: Implement Model-View-Controller pattern for a user management system.
Problem Statement: Create a simple user management system following MVC principles.


In [None]:
from typing import List, Dict, Any, Optional


# ================= MODEL =================
class UserModel:
    def __init__(self):
        self._users: List[Dict[str, Any]] = []
        self._observers: List[Any] = []

    def add_observer(self, observer):
        if observer not in self._observers:
            self._observers.append(observer)

    def notify_observers(self):
        for observer in self._observers:
            observer.update()

    def add_user(self, user_data: Dict[str, str]):
        user_id = len(self._users) + 1
        user_data['id'] = user_id
        self._users.append(user_data.copy())
        self.notify_observers()

    def get_users(self) -> List[Dict[str, Any]]:
        return [user.copy() for user in self._users]

    def get_user(self, user_id: int) -> Optional[Dict[str, Any]]:
        for user in self._users:
            if user['id'] == user_id:
                return user.copy()
        return None

    def update_user(self, user_id: int, user_data: Dict[str, str]) -> bool:
        for i, user in enumerate(self._users):
            if user['id'] == user_id:
                self._users[i].update(user_data)
                self.notify_observers()
                return True
        return False

    def delete_user(self, user_id: int) -> bool:
        initial_len = len(self._users)
        self._users = [user for user in self._users if user['id'] != user_id]
        if len(self._users) < initial_len:
            self.notify_observers()
            return True
        return False


# ================= VIEW =================
class UserView:
    def display_users(self, users: List[Dict[str, Any]]):
        print("\n=== User List ===")
        if not users:
            print("  No users found.")
        else:
            for user in users:
                print(f"  ID: {user['id']}, Name: {user['name']}, Email: {user['email']}")
        print("=================\n")

    def display_user(self, user: Optional[Dict[str, Any]]):
        if user:
            print(f"\n=== User Details ===")
            print(f"  ID: {user['id']}")
            print(f"  Name: {user['name']}")
            print(f"  Email: {user['email']}")
            print("===================\n")
        else:
            print("  User not found!\n")

    def get_user_input(self) -> Dict[str, str]:
        name = input("Enter user name: ").strip()
        email = input("Enter user email: ").strip()
        return {'name': name, 'email': email}

    def show_message(self, message: str):
        print(f"Info: {message}")


# ================= CONTROLLER =================
class UserController:
    def __init__(self, model: UserModel, view: UserView):
        self.model = model
        self.view = view
        self.model.add_observer(self)  # Controller observes Model

    def update(self):
        """Called when model changes — refresh view"""
        users = self.model.get_users()
        self.view.display_users(users)

    def add_user(self):
        user_data = self.view.get_user_input()
        self.model.add_user(user_data)
        self.view.show_message("User added successfully!")

    def show_users(self):
        users = self.model.get_users()
        self.view.display_users(users)

    def show_user(self, user_id: int):
        user = self.model.get_user(user_id)
        self.view.display_user(user)

    def update_user(self, user_id: int):
        user = self.model.get_user(user_id)
        if user:
            self.view.show_message(f"Updating user {user['name']}")
            new_data = self.view.get_user_input()
            self.model.update_user(user_id, new_data)
            self.view.show_message("User updated successfully!")
        else:
            self.view.show_message("User not found!")

    def delete_user(self, user_id: int):
        if self.model.delete_user(user_id):
            self.view.show_message("User deleted successfully!")
        else:
            self.view.show_message("User not found!")


# ================= DEMO =================
if __name__ == "__main__":
    model = UserModel()
    view = UserView()
    controller = UserController(model, view)

    print("=== MVC User Management System ===\n")

    # Add users interactively
    print("Add first user:")
    controller.add_user()

    print("\nAdd second user:")
    controller.add_user()

    # Show all users
    controller.show_users()

    # Show specific user
    controller.show_user(1)

    # Update user
    print("Update user ID 1:")
    controller.update_user(1)

    # Delete user
    print("Delete user ID 2:")
    controller.delete_user(2)

    # Final state
    print("Final user list:")
    controller.show_users()

=== MVC User Management System ===

Add first user:
Enter user name: Tanvir
Enter user email: 2020tanvir1971@gmail.com

=== User List ===
  ID: 1, Name: Tanvir, Email: 2020tanvir1971@gmail.com

Info: User added successfully!

Add second user:
Enter user name: roman
Enter user email: roman@gmail.com

=== User List ===
  ID: 1, Name: Tanvir, Email: 2020tanvir1971@gmail.com
  ID: 2, Name: roman, Email: roman@gmail.com

Info: User added successfully!

=== User List ===
  ID: 1, Name: Tanvir, Email: 2020tanvir1971@gmail.com
  ID: 2, Name: roman, Email: roman@gmail.com


=== User Details ===
  ID: 1
  Name: Tanvir
  Email: 2020tanvir1971@gmail.com

Update user ID 1:
Info: Updating user Tanvir
Enter user name: Tanvir
Enter user email: 2020tanvir1971@gmail.com

=== User List ===
  ID: 1, Name: Tanvir, Email: 2020tanvir1971@gmail.com
  ID: 2, Name: roman, Email: roman@gmail.com

Info: User updated successfully!
Delete user ID 2:

=== User List ===
  ID: 1, Name: Tanvir, Email: 2020tanvir1971@gm

#Lab 5: Layered Architecture
Objective: Implement a 3-tier layered architecture for a library management system.
Problem Statement: Create a library system with presentation, business logic, and data access layers.

In [None]:
from typing import List, Dict, Any, Tuple, Optional


# ================= DATA ACCESS LAYER =================
class BookRepository:
    def __init__(self):
        self._books: List[Dict[str, Any]] = [
            {'id': 1, 'title': 'Python Programming', 'author': 'John Doe', 'available': True},
            {'id': 2, 'title': 'Software Architecture', 'author': 'Jane Smith', 'available': True},
            {'id': 3, 'title': 'Design Patterns', 'author': 'Bob Johnson', 'available': False}
        ]

    def get_all_books(self) -> List[Dict[str, Any]]:
        return [book.copy() for book in self._books]

    def get_book_by_id(self, book_id: int) -> Optional[Dict[str, Any]]:
        for book in self._books:
            if book['id'] == book_id:
                return book.copy()
        return None

    def get_available_books(self) -> List[Dict[str, Any]]:
        return [book.copy() for book in self._books if book['available']]

    def borrow_book(self, book_id: int) -> bool:
        for book in self._books:
            if book['id'] == book_id and book['available']:
                book['available'] = False
                return True
        return False

    def return_book(self, book_id: int) -> bool:
        for book in self._books:
            if book['id'] == book_id and not book['available']:
                book['available'] = True
                return True
        return False


# ================= BUSINESS LOGIC LAYER =================
class LibraryService:
    def __init__(self):
        self.book_repo = BookRepository()

    def display_all_books(self) -> List[Dict[str, Any]]:
        return self.book_repo.get_all_books()

    def display_available_books(self) -> List[Dict[str, Any]]:
        return self.book_repo.get_available_books()

    def borrow_book(self, book_id: int) -> Tuple[bool, str]:
        book = self.book_repo.get_book_by_id(book_id)
        if not book:
            return False, "Book not found."

        if not book['available']:
            return False, "Book is already borrowed."

        if self.book_repo.borrow_book(book_id):
            return True, f"Successfully borrowed '{book['title']}'."
        else:
            return False, "Failed to borrow book."

    def return_book(self, book_id: int) -> Tuple[bool, str]:
        book = self.book_repo.get_book_by_id(book_id)
        if not book:
            return False, "Book not found."

        if book['available']:
            return False, "Book is already available."

        if self.book_repo.return_book(book_id):
            return True, f"Successfully returned '{book['title']}'."
        else:
            return False, "Failed to return book."


# ================= PRESENTATION LAYER =================
class LibraryUI:
    def __init__(self):
        self.library_service = LibraryService()

    def display_menu(self):
        print("\n=== Library Management System ===")
        print("1. View all books")
        print("2. View available books")
        print("3. Borrow a book")
        print("4. Return a book")
        print("5. Exit")

    def display_books(self, books: List[Dict[str, Any]]):
        if not books:
            print("\nNo books found.")
            return

        print("\n=== Book List ===")
        for book in books:
            status = "Available" if book['available'] else "Borrowed"
            print(f"  ID: {book['id']}, Title: {book['title']}, "
                  f"Author: {book['author']}, Status: {status}")
        print()

    def run(self):
        while True:
            self.display_menu()
            choice = input("Enter your choice (1-5): ").strip()

            if choice == '1':
                books = self.library_service.display_all_books()
                self.display_books(books)

            elif choice == '2':
                books = self.library_service.display_available_books()
                self.display_books(books)

            elif choice == '3':
                try:
                    book_id = int(input("Enter book ID to borrow: ").strip())
                    success, message = self.library_service.borrow_book(book_id)
                    print(f"→ {message}")
                except ValueError:
                    print("Invalid ID. Please enter a number.")

            elif choice == '4':
                try:
                    book_id = int(input("Enter book ID to return: ").strip())
                    success, message = self.library_service.return_book(book_id)
                    print(f"→ {message}")
                except ValueError:
                    print("Invalid ID. Please enter a number.")

            elif choice == '5':
                print("Thank you for using the Library Management System!")
                break

            else:
                print("Invalid choice. Please try again.")


# ================= RUN APPLICATION =================
if __name__ == "__main__":
    ui = LibraryUI()
    ui.run()


=== Library Management System ===
1. View all books
2. View available books
3. Borrow a book
4. Return a book
5. Exit
Enter your choice (1-5): 3
Enter book ID to borrow: 10
→ Book not found.

=== Library Management System ===
1. View all books
2. View available books
3. Borrow a book
4. Return a book
5. Exit
Enter your choice (1-5): 3
Enter book ID to borrow: 1
→ Successfully borrowed 'Python Programming'.

=== Library Management System ===
1. View all books
2. View available books
3. Borrow a book
4. Return a book
5. Exit
Enter your choice (1-5): 5
Thank you for using the Library Management System!


#Lab 6: Microservices Communication
Objective: Implement inter-service communication in a microservices architecture.
Problem Statement: Create user and order services that communicate via REST API

In [15]:
from flask import Flask, jsonify, request
import requests
import threading
import time
from typing import Dict, Any


# ================= USER SERVICE =================
class UserService:
    def __init__(self, port: int = 5001):
        self.app = Flask(__name__)
        self.port = port
        self.users: Dict[int, Dict[str, Any]] = {
            1: {'id': 1, 'name': 'John Doe', 'email': 'john@example.com'},
            2: {'id': 2, 'name': 'Jane Smith', 'email': 'jane@example.com'}
        }
        self.setup_routes()

    def setup_routes(self):
        @self.app.route('/users', methods=['GET'])
        def get_users():
            return jsonify(list(self.users.values()))

        @self.app.route('/users/<int:user_id>', methods=['GET'])
        def get_user(user_id):
            user = self.users.get(user_id)
            if user:
                return jsonify(user)
            return jsonify({'error': 'User not found'}), 404

        @self.app.route('/users', methods=['POST'])
        def create_user():
            data = request.get_json()
            if not data or 'name' not in data or 'email' not in data:
                return jsonify({'error': 'Name and email required'}), 400

            user_id = max(self.users.keys()) + 1
            user = {
                'id': user_id,
                'name': data['name'],
                'email': data['email']
            }
            self.users[user_id] = user
            return jsonify(user), 201

    def run(self):
        self.app.run(port=self.port, debug=False, use_reloader=False)


# ================= ORDER SERVICE =================
class OrderService:
    def __init__(self, port: int = 5002, user_service_url: str = 'http://localhost:5001'):
        self.app = Flask(__name__)
        self.port = port
        self.user_service_url = user_service_url
        self.orders: Dict[int, Dict[str, Any]] = {
            1: {'id': 1, 'user_id': 1, 'product': 'Laptop', 'amount': 999.99},
            2: {'id': 2, 'user_id': 2, 'product': 'Mouse', 'amount': 29.99}
        }
        self.setup_routes()

    def setup_routes(self):
        @self.app.route('/orders', methods=['GET'])
        def get_orders():
            return jsonify(list(self.orders.values()))

        @self.app.route('/orders/<int:order_id>', methods=['GET'])
        def get_order(order_id):
            order = self.orders.get(order_id)
            if not order:
                return jsonify({'error': 'Order not found'}), 404

            # Enrich order with user data
            try:
                user_response = requests.get(
                    f"{self.user_service_url}/users/{order['user_id']}",
                    timeout=2
                )
                if user_response.status_code == 200:
                    order_with_user = order.copy()
                    order_with_user['user'] = user_response.json()
                    return jsonify(order_with_user)
            except requests.RequestException:
                pass  # Fallback: return order without user

            return jsonify(order)

        @self.app.route('/orders', methods=['POST'])
        def create_order():
            data = request.get_json()
            required = ['user_id', 'product', 'amount']
            if not data or any(k not in data for k in required):
                return jsonify({'error': 'user_id, product, amount required'}), 400

            # Validate user exists
            try:
                user_response = requests.get(
                    f"{self.user_service_url}/users/{data['user_id']}",
                    timeout=2
                )
                if user_response.status_code != 200:
                    return jsonify({'error': 'User not found'}), 400
            except requests.RequestException:
                return jsonify({'error': 'User service unavailable'}), 503

            order_id = max(self.orders.keys()) + 1
            order = {
                'id': order_id,
                'user_id': data['user_id'],
                'product': data['product'],
                'amount': data['amount']
            }
            self.orders[order_id] = order
            return jsonify(order), 201

    def run(self):
        self.app.run(port=self.port, debug=False, use_reloader=False)


# ================= API GATEWAY =================
class APIGateway:
    def __init__(self, port: int = 5000):
        self.app = Flask(__name__)
        self.port = port
        self.user_service_url = 'http://localhost:5001'
        self.order_service_url = 'http://localhost:5002'
        self.setup_routes()

    def setup_routes(self):
        @self.app.route('/api/users', methods=['GET', 'POST'])
        @self.app.route('/api/users/<int:user_id>', methods=['GET'])
        def users_proxy(user_id=None):
            if request.method == 'GET':
                url = f"{self.user_service_url}/users"
                if user_id:
                    url += f"/{user_id}"
            else:  # POST
                url = f"{self.user_service_url}/users"

            return self._proxy_request(url)

        @self.app.route('/api/orders', methods=['GET', 'POST'])
        @self.app.route('/api/orders/<int:order_id>', methods=['GET'])
        def orders_proxy(order_id=None):
            if request.method == 'GET':
                url = f"{self.order_service_url}/orders"
                if order_id:
                    url += f"/{order_id}"
            else:  # POST
                url = f"{self.order_service_url}/orders"

            return self._proxy_request(url)

    def _proxy_request(self, url: str):
        try:
            response = requests.request(
                method=request.method,
                url=url,
                headers={k: v for k, v in request.headers if k != 'Host'},
                data=request.get_data(),
                params=request.args,
                cookies=request.cookies,
                allow_redirects=False,
                timeout=5
            )
            return (response.content, response.status_code, response.headers.items())
        except requests.RequestException as e:
            service = "User" if "users" in url else "Order"
            return jsonify({'error': f'{service} service unavailable'}), 503

    def run(self):
        self.app.run(port=self.port, debug=False, use_reloader=False)


# ================= HELPER & MAIN =================
def run_service(service):
    service.run()


if __name__ == "__main__":
    # Create services
    user_service = UserService()
    order_service = OrderService()
    api_gateway = APIGateway()

    services = [user_service, order_service, api_gateway]
    threads = []

    # Start all services in background
    for service in services:
        thread = threading.Thread(target=run_service, args=(service,))
        thread.daemon = True
        threads.append(thread)
        thread.start()

    print("Microservices started. Testing in 2 seconds...")
    time.sleep(2)

    base_url = "http://localhost:5000/api"

    try:
        # Test: Get all users
        response = requests.get(f"{base_url}/users")
        print(f"Users: {response.json()}")

        # Test: Get order with user details
        response = requests.get(f"{base_url}/orders/1")
        print(f"Order with user: {response.json()}")

        # Test: Create new user
        new_user = {'name': 'Bob Wilson', 'email': 'bob@example.com'}
        response = requests.post(f"{base_url}/users", json=new_user)
        print(f"Created user: {response.json()}")

        # Test: Create new order
        new_order = {'user_id': 3, 'product': 'Keyboard', 'amount': 79.99}
        response = requests.post(f"{base_url}/orders", json=new_order)
        print(f"Created order: {response.json()}")

    except requests.RequestException as e:
        print(f"Error testing services: {e}")

    # Keep alive
    try:
        print("\nServices running. Press Ctrl+C to stop.")
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\nStopping services...")

 * Serving Flask app '__main__'
Microservices started. Testing in 2 seconds...
 * Serving Flask app '__main__'
 * Debug mode: off
 * Serving Flask app '__main__'
 * Debug mode: off
 * Debug mode: off


Address already in use
Port 5000 is in use by another program. Either identify and stop that program, or start the server with a different port.
Address already in use
Port 5001 is in use by another program. Either identify and stop that program, or start the server with a different port.
Address already in use
Port 5002 is in use by another program. Either identify and stop that program, or start the server with a different port.
INFO:werkzeug:127.0.0.1 - - [19/Nov/2025 06:59:01] "GET /users HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [19/Nov/2025 06:59:01] "GET /api/users HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [19/Nov/2025 06:59:01] "GET /users/1 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [19/Nov/2025 06:59:01] "GET /orders/1 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [19/Nov/2025 06:59:01] "GET /api/orders/1 HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [19/Nov/2025 06:59:01] "[35m[1mPOST /users HTTP/1.1[0m" 201 -
INFO:werkzeug:127.0.0.1 - - [19/Nov/2025 06:59:01] "[35m[1m

Users: [{'email': 'john@example.com', 'id': 1, 'name': 'John Doe'}, {'email': 'jane@example.com', 'id': 2, 'name': 'Jane Smith'}, {'email': 'bob@example.com', 'id': 3, 'name': 'Bob Wilson'}]
Order with user: {'amount': 999.99, 'id': 1, 'product': 'Laptop', 'user': {'email': 'john@example.com', 'id': 1, 'name': 'John Doe'}, 'user_id': 1}
Created user: {'email': 'bob@example.com', 'id': 4, 'name': 'Bob Wilson'}
Created order: {'amount': 79.99, 'id': 4, 'product': 'Keyboard', 'user_id': 3}

Services running. Press Ctrl+C to stop.

Stopping services...


#Lab 7: Event-Driven Architecture
Objective: Implement an event-driven architecture using message queues.
Problem Statement: Create a notification system that processes events asynchronously

In [None]:
import threading
import time
import json
from datetime import datetime
from typing import Dict, List, Callable, Any
from enum import Enum


# ================= EVENT TYPE =================
class EventType(Enum):
    USER_REGISTERED = "user_registered"
    ORDER_CREATED = "order_created"
    PAYMENT_PROCESSED = "payment_processed"
    ORDER_SHIPPED = "order_shipped"


# ================= EVENT =================
class Event:
    def __init__(self, event_type: EventType, data: Dict[str, Any]):
        self.event_type = event_type
        self.data = data
        self.timestamp = datetime.now()
        self.id = f"{event_type.value}_{int(time.time())}"

    def to_dict(self) -> Dict[str, Any]:
        return {
            'id': self.id,
            'event_type': self.event_type.value,
            'timestamp': self.timestamp.isoformat(),
            'data': self.data
        }


# ================= EVENT BUS =================
class EventBus:
    def __init__(self):
        self._subscribers: Dict[EventType, List[Callable[[Event], None]]] = {}
        self._event_queue: List[Event] = []
        self._is_running = False
        self._lock = threading.Lock()
        self._thread: Optional[threading.Thread] = None

    def subscribe(self, event_type: EventType, callback: Callable[[Event], None]):
        with self._lock:
            if event_type not in self._subscribers:
                self._subscribers[event_type] = []
            if callback not in self._subscribers[event_type]:
                self._subscribers[event_type].append(callback)

    def unsubscribe(self, event_type: EventType, callback: Callable[[Event], None]):
        with self._lock:
            if event_type in self._subscribers:
                self._subscribers[event_type] = [
                    cb for cb in self._subscribers[event_type] if cb != callback
                ]

    def publish(self, event: Event):
        with self._lock:
            self._event_queue.append(event)

    def _process_events(self):
        while self._is_running:
            event = None
            with self._lock:
                if self._event_queue:
                    event = self._event_queue.pop(0)

            if event:
                if event.event_type in self._subscribers:
                    for callback in self._subscribers[event.event_type]:
                        try:
                            callback(event)
                        except Exception as e:
                            print(f"Error in event handler {callback.__name__}: {e}")
            else:
                time.sleep(0.01)  # Prevent busy-waiting

    def start(self):
        if not self._is_running:
            self._is_running = True
            self._thread = threading.Thread(target=self._process_events, daemon=True)
            self._thread.start()
            print("[EventBus] Started.")

    def stop(self):
        self._is_running = False
        if self._thread:
            self._thread.join(timeout=2)
        print("[EventBus] Stopped.")


# ================= SERVICES =================
class EmailService:
    def send_welcome_email(self, event: Event):
        user = event.data.get('user', {})
        email = user.get('email', 'unknown')
        print(f"[Email] Sending welcome email to {email}")
        time.sleep(0.5)
        print(f"[Email] Welcome email sent to {email}")

    def send_order_confirmation(self, event: Event):
        order = event.data.get('order', {})
        user = event.data.get('user', {})
        print(f"[Email] Sending order confirmation to {user.get('email')} for order #{order.get('id')}")
        time.sleep(0.4)

    def send_shipping_notification(self, event: Event):
        order = event.data.get('order', {})
        user = event.data.get('user', {})
        print(f"[Email] Sending shipping notification to {user.get('email')} for order #{order.get('id')}")
        time.sleep(0.3)


class SMSService:
    def send_welcome_sms(self, event: Event):
        user = event.data.get('user', {})
        phone = user.get('phone', 'unknown')
        print(f"[SMS] Sending welcome SMS to {phone}")
        time.sleep(0.3)
        print(f"[SMS] Welcome SMS sent to {phone}")

    def send_order_sms(self, event: Event):
        order = event.data.get('order', {})
        user = event.data.get('user', {})
        print(f"[SMS] Sending order update to {user.get('phone')} for order #{order.get('id')}")
        time.sleep(0.2)


class AnalyticsService:
    def track_user_registration(self, event: Event):
        user = event.data.get('user', {})
        print(f"[Analytics] Tracking user registration: {user.get('name')}")

    def track_order_creation(self, event: Event):
        order = event.data.get('order', {})
        print(f"[Analytics] Tracking order creation: #{order.get('id')}")

    def track_payment(self, event: Event):
        payment = event.data.get('payment', {})
        print(f"[Analytics] Tracking payment: ${payment.get('amount')} (ID: {payment.get('id')})")


class InventoryService:
    def update_inventory(self, event: Event):
        order = event.data.get('order', {})
        print(f"[Inventory] Updating inventory for order #{order.get('id')}")
        time.sleep(0.2)
        print(f"[Inventory] Inventory updated for order #{order.get('id')}")


# ================= ECOMMERCE APP =================
class ECommerceApplication:
    def __init__(self):
        self.event_bus = EventBus()
        self.setup_services()

    def setup_services(self):
        email_service = EmailService()
        sms_service = SMSService()
        analytics_service = AnalyticsService()
        inventory_service = InventoryService()

        # Subscribe to events
        self.event_bus.subscribe(EventType.USER_REGISTERED, email_service.send_welcome_email)
        self.event_bus.subscribe(EventType.USER_REGISTERED, sms_service.send_welcome_sms)
        self.event_bus.subscribe(EventType.USER_REGISTERED, analytics_service.track_user_registration)

        self.event_bus.subscribe(EventType.ORDER_CREATED, email_service.send_order_confirmation)
        self.event_bus.subscribe(EventType.ORDER_CREATED, sms_service.send_order_sms)
        self.event_bus.subscribe(EventType.ORDER_CREATED, analytics_service.track_order_creation)
        self.event_bus.subscribe(EventType.ORDER_CREATED, inventory_service.update_inventory)

        self.event_bus.subscribe(EventType.ORDER_SHIPPED, email_service.send_shipping_notification)
        self.event_bus.subscribe(EventType.ORDER_SHIPPED, sms_service.send_order_sms)

        self.event_bus.subscribe(EventType.PAYMENT_PROCESSED, analytics_service.track_payment)

    def register_user(self, user_data: Dict[str, Any]):
        print(f"\nRegistering User: {user_data['name']} ===")
        event = Event(EventType.USER_REGISTERED, {'user': user_data})
        self.event_bus.publish(event)

    def create_order(self, order_data: Dict[str, Any], user_data: Dict[str, Any]):
        print(f"\nCreating Order: #{order_data['id']} ===")
        event = Event(EventType.ORDER_CREATED, {
            'order': order_data,
            'user': user_data
        })
        self.event_bus.publish(event)

    def process_payment(self, payment_data: Dict[str, Any]):
        print(f"\nProcessing Payment ===")
        event = Event(EventType.PAYMENT_PROCESSED, {'payment': payment_data})
        self.event_bus.publish(event)

    def ship_order(self, order_data: Dict[str, Any], user_data: Dict[str, Any]):
        print(f"\nShipping Order: #{order_data['id']} ===")
        event = Event(EventType.ORDER_SHIPPED, {
            'order': order_data,
            'user': user_data
        })
        self.event_bus.publish(event)


# ================= DEMO =================
if __name__ == "__main__":
    app = ECommerceApplication()
    app.event_bus.start()

    # Simulate user journey
    user = {
        'id': 1,
        'name': 'Alice Johnson',
        'email': 'alice@example.com',
        'phone': '+1234567890'
    }

    order = {
        'id': 101,
        'product': 'Laptop',
        'amount': 999.99
    }

    payment = {
        'id': 201,
        'order_id': 101,
        'amount': 999.99,
        'status': 'completed'
    }

    # Execute flow
    app.register_user(user)
    time.sleep(1.5)

    app.create_order(order, user)
    time.sleep(1.5)

    app.process_payment(payment)
    time.sleep(1)

    app.ship_order(order, user)
    time.sleep(2)

    # Shutdown
    app.event_bus.stop()
    print("\nAll events processed. System shutdown.")

[EventBus] Started.

Registering User: Alice Johnson ===
[Email] Sending welcome email to alice@example.com
[Email] Welcome email sent to alice@example.com
[SMS] Sending welcome SMS to +1234567890
[SMS] Welcome SMS sent to +1234567890
[Analytics] Tracking user registration: Alice Johnson

Creating Order: #101 ===
[Email] Sending order confirmation to alice@example.com for order #101
[SMS] Sending order update to +1234567890 for order #101
[Analytics] Tracking order creation: #101
[Inventory] Updating inventory for order #101
[Inventory] Inventory updated for order #101

Processing Payment ===
[Analytics] Tracking payment: $999.99 (ID: 201)

Shipping Order: #101 ===
[Email] Sending shipping notification to alice@example.com for order #101
[SMS] Sending order update to +1234567890 for order #101
[EventBus] Stopped.

All events processed. System shutdown.


#Lab 8: Quality Attribute Analysis
Objective: Analyze and measure software quality attributes.
Problem Statement: Create tools to measure performance, maintainability, and reliability.

In [None]:
!pip install memory-profiler

Collecting memory-profiler
  Downloading memory_profiler-0.61.0-py3-none-any.whl.metadata (20 kB)
Downloading memory_profiler-0.61.0-py3-none-any.whl (31 kB)
Installing collected packages: memory-profiler
Successfully installed memory-profiler-0.61.0


In [None]:
# ================================
# PERFORMANCE + MAINTAINABILITY + RELIABILITY ANALYZER
# 100% WORKS IN JUPYTER, COLAB, AND .py SCRIPTS
# ================================

import time
import statistics
import random
import inspect
import ast
import sys
from typing import List, Dict, Callable, Any, Optional

# Optional: Memory profiling
try:
    import memory_profiler
    MEMORY_PROFILER_AVAILABLE = True
except ImportError:
    MEMORY_PROFILER_AVAILABLE = False
    print("Warning: memory_profiler not installed. Memory tracking skipped.")
    print("Install with: pip install memory-profiler")


# ================= PERFORMANCE ANALYZER =================
class PerformanceAnalyzer:
    def __init__(self):
        self.metrics: Dict[str, List[float]] = {}

    def measure_execution_time(self, func: Callable) -> Callable:
        def wrapper(*args, **kwargs):
            start = time.perf_counter()
            result = func(*args, **kwargs)
            end = time.perf_counter()
            duration = end - start
            name = func.__name__
            self.metrics.setdefault(name, []).append(duration)
            print(f"Time: {name} executed in {duration:.6f}s")
            return result
        return wrapper

    def measure_memory_usage(self, func: Callable) -> Callable:
        if not MEMORY_PROFILER_AVAILABLE:
            def wrapper(*args, **kwargs):
                print(f"Memory: Skipping '{func.__name__}' (memory_profiler not available)")
                return func(*args, **kwargs)
            return wrapper

        def wrapper(*args, **kwargs):
            mem_before = memory_profiler.memory_usage()[0]
            result = func(*args, **kwargs)
            mem_after = memory_profiler.memory_usage()[0]
            used = mem_after - mem_before
            print(f"Memory: {func.__name__} used {used:.2f} MB")
            return result
        return wrapper

    def get_performance_summary(self):
        print("\n" + "="*60)
        print(" PERFORMANCE SUMMARY ")
        print("="*60)
        if not self.metrics:
            print("  No functions measured.")
            return
        for name, times in self.metrics.items():
            avg = statistics.mean(times)
            min_t = min(times)
            max_t = max(times)
            std = statistics.stdev(times) if len(times) > 1 else 0.0
            print(f"\n{name}:")
            print(f"  Avg: {avg:.6f}s | Min: {min_t:.6f}s | Max: {max_t:.6f}s")
            print(f"  Std Dev: {std:.6f}s | Samples: {len(times)}")


# ================= MAINTAINABILITY ANALYZER =================
class MaintainabilityAnalyzer:
    def analyze_complexity(self, source_code: str) -> Dict[str, Any]:
        try:
            tree = ast.parse(source_code)
        except Exception as e:
            return {"error": f"Parse error: {e}"}

        metrics = {
            'line_count': len(source_code.splitlines()),
            'function_count': 0,
            'class_count': 0,
            'average_function_length': 0.0,
            'max_nesting': 0,
            'cyclomatic_complexity': 1
        }

        function_lines = []

        class Visitor(ast.NodeVisitor):
            def __init__(self):
                self.in_function = False
                self.current_lines = 0
                self.nesting = 0

            def visit_FunctionDef(self, node):
                metrics['function_count'] += 1
                self.in_function = True
                self.current_lines = 0
                self.nesting = 0
                self.generic_visit(node)
                function_lines.append(self.current_lines)
                self.in_function = False

            def visit_ClassDef(self, node):
                metrics['class_count'] += 1
                self.generic_visit(node)

            def visit(self, node):
                control_nodes = (ast.If, ast.For, ast.While, ast.Try, ast.With,
                                ast.AsyncFor, ast.AsyncWith, ast.Match)
                if isinstance(node, control_nodes):
                    metrics['cyclomatic_complexity'] += 1
                    self.nesting += 1
                    metrics['max_nesting'] = max(metrics['max_nesting'], self.nesting)
                super().visit(node)
                if isinstance(node, control_nodes):
                    self.nesting -= 1

            def generic_visit(self, node):
                if self.in_function:
                    self.current_lines += 1
                super().generic_visit(node)

        visitor = Visitor()
        visitor.visit(tree)

        if function_lines:
            metrics['average_function_length'] = sum(function_lines) / len(function_lines)
        return metrics

    def calculate_maintainability_index(self, source_code: str) -> float:
        metrics = self.analyze_complexity(source_code)
        if "error" in metrics:
            return 0.0

        line_count = metrics['line_count']
        func_count = metrics['function_count']
        avg_len = metrics['average_function_length']

        line_factor = max(0, 1 - line_count / 1000)
        func_factor = max(0, 1 - func_count / 50)
        len_factor = max(0, 1 - avg_len / 50)

        index = (line_factor + func_factor + len_factor) / 3 * 100
        return max(0, min(100, index))


# ================= RELIABILITY TESTER =================
class ReliabilityTester:
    def __init__(self):
        self.failure_count = 0
        self.success_count = 0
        self.response_times: List[float] = []

    def test_reliability(self, func: Callable, test_cases: List[Dict], iterations: int = 100):
        print(f"\n=== Reliability Testing: {func.__name__} ===")
        total = len(test_cases) * iterations

        for i in range(iterations):
            for case in test_cases:
                inputs = case.get('input', ())
                expected = case.get('expected')

                try:
                    start = time.perf_counter()
                    result = func(*inputs)
                    end = time.perf_counter()
                    self.response_times.append(end - start)

                    if expected is not None and result != expected:
                        self.failure_count += 1
                        print(f"  Failed: input={inputs} | expected={expected}, got={result}")
                    else:
                        self.success_count += 1
                except Exception as e:
                    self.failure_count += 1
                    print(f"  Exception: {e} for input={inputs}")

        self._print_reliability_report(total)

    def _print_reliability_report(self, total_tests: int):
        reliability = (self.success_count / total_tests) * 100 if total_tests > 0 else 0
        print(f"\n=== Reliability Report ===")
        print(f"  Total Tests : {total_tests}")
        print(f"  Successful  : {self.success_count}")
        print(f"  Failures    : {self.failure_count}")
        print(f"  Reliability : {reliability:.2f}%")

        if self.response_times:
            avg = statistics.mean(self.response_times)
            print(f"  Avg Response: {avg:.6f}s")


# ================= EXAMPLE FUNCTIONS =================
class ExampleFunctions:
    @staticmethod
    def fibonacci(n: int) -> int:
        if n <= 1:
            return n
        a, b = 0, 1
        for _ in range(2, n + 1):
            a, b = b, a + b
        return b

    @staticmethod
    def process_data(data: List[int]) -> Dict:
        if not data:
            return {}
        return {
            'sum': sum(data),
            'average': statistics.mean(data),
            'max': max(data),
            'min': min(data)
        }

    @staticmethod
    def simulate_network_request() -> bool:
        time.sleep(0.01)
        return random.random() > 0.1


# ================= SAFE SOURCE GETTER (Notebook + Script) =================
def safe_getsource(obj) -> Optional[str]:
    """Safely get source code — works in notebooks and .py files."""
    try:
        return inspect.getsource(obj)
    except (OSError, TypeError, IOError):
        # In notebook: no file → skip or use cell
        return None


def get_current_cell_source() -> Optional[str]:
    """Get last executed cell in Jupyter/Colab."""
    try:
        import IPython
        ip = IPython.get_ipython()
        if ip is not None:
            cell = ip.user_ns.get('In', [''])[-1]
            return cell if cell.strip() else None
    except:
        pass
    return None


# ================= DEMO =================
def run_demo():
    perf = PerformanceAnalyzer()
    maint = MaintainabilityAnalyzer()

    print("Starting Full Analysis...\n")

    # === 1. Performance ===
    print("1. Performance Testing...")
    fib_perf = perf.measure_execution_time(ExampleFunctions.fibonacci)
    proc_perf = perf.measure_execution_time(ExampleFunctions.process_data)

    for _ in range(5):
        fib_perf(1000)
        proc_perf(list(range(1000)))

    perf.get_performance_summary()

    # === 2. Maintainability (Safe) ===
    print("\n" + "="*60)
    print("2. Maintainability Analysis...")

    source = safe_getsource(ExampleFunctions)
    if source:
        metrics = maint.analyze_complexity(source)
        index = maint.calculate_maintainability_index(source)
        print("Complexity Metrics:")
        for k, v in metrics.items():
            if k != "error":
                print(f"  {k}: {v}")
        print(f"Maintainability Index: {index:.2f}/100")
    else:
        print("  Source code not available (running in notebook). Skipping class analysis.")

    # === 3. Reliability ===
    print("\n" + "="*60)
    print("3. Reliability Testing...")

    fib_cases = [
        {'input': (0,), 'expected': 0},
        {'input': (1,), 'expected': 1},
        {'input': (5,), 'expected': 5},
        {'input': (10,), 'expected': 55},
        {'input': (20,), 'expected': 6765}
    ]
    ReliabilityTester().test_reliability(ExampleFunctions.fibonacci, fib_cases, iterations=10)

    net_cases = [{'input': ()} for _ in range(50)]
    ReliabilityTester().test_reliability(ExampleFunctions.simulate_network_request, net_cases, iterations=1)

    # === 4. Full Source / Cell Analysis ===
    print("\n" + "="*60)
    print("4. Full Script / Cell Analysis...")

    full_source = None

    # Try .py file
    try:
        with open(__file__, 'r', encoding='utf-8') as f:
            full_source = f.read()
        print("  Analyzing .py file...")
    except NameError:
        # In notebook
        cell_source = get_current_cell_source()
        if cell_source:
            full_source = cell_source
            print("  Analyzing current notebook cell...")
        else:
            print("  No cell source found.")
    except Exception as e:
        print(f"  Could not read file: {e}")

    if full_source:
        index = maint.calculate_maintainability_index(full_source)
        print(f"Full Source Maintainability: {index:.2f}/100")
    else:
        print("  Tip: Save as .py to analyze full file.")


# Run demo
run_demo()

Starting Full Analysis...

1. Performance Testing...
Time: fibonacci executed in 0.000079s
Time: process_data executed in 0.000319s
Time: fibonacci executed in 0.000068s
Time: process_data executed in 0.000310s
Time: fibonacci executed in 0.000068s
Time: process_data executed in 0.000272s
Time: fibonacci executed in 0.000067s
Time: process_data executed in 0.000274s
Time: fibonacci executed in 0.000090s
Time: process_data executed in 0.000273s

 PERFORMANCE SUMMARY 

fibonacci:
  Avg: 0.000074s | Min: 0.000067s | Max: 0.000090s
  Std Dev: 0.000010s | Samples: 5

process_data:
  Avg: 0.000289s | Min: 0.000272s | Max: 0.000319s
  Std Dev: 0.000023s | Samples: 5

2. Maintainability Analysis...
  Source code not available (running in notebook). Skipping class analysis.

3. Reliability Testing...

=== Reliability Testing: fibonacci ===

=== Reliability Report ===
  Total Tests : 50
  Successful  : 50
  Failures    : 0
  Reliability : 100.00%
  Avg Response: 0.000000s

=== Reliability Testin

#Lab 9: Load Balancer Implementation
Objective: Implement different load balancing algorithms.
Problem Statement: Create a load balancer that distributes requests using various algorithms.


In [None]:
# ================================
# LOAD BALANCER WITH MULTIPLE STRATEGIES
# Works in .py, Jupyter, Google Colab
# ================================

import threading
import time
import random
import statistics
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from collections import defaultdict


# ================= SERVER =================
class Server:
    def __init__(self, server_id: str, capacity: int = 100):
        self.server_id = server_id
        self.capacity = capacity
        self.current_load = 0
        self.response_times = []
        self.error_count = 0
        self.request_count = 0
        self.is_healthy = True
        self.lock = threading.Lock()

    def process_request(self, request_id: str) -> Dict[str, Any]:
        with self.lock:
            if self.current_load >= self.capacity:
                self.error_count += 1
                return {'error': 'Server overloaded', 'server': self.server_id}

            self.current_load += 1
            self.request_count += 1

        # Simulate processing time
        processing_time = random.uniform(0.1, 2.0)

        # Simulate 5% error rate
        if random.random() < 0.05:
            with self.lock:
                self.error_count += 1
            time.sleep(processing_time)
            return {'error': 'Internal server error', 'server': self.server_id}

        time.sleep(processing_time)

        with self.lock:
            self.current_load -= 1
            self.response_times.append(processing_time)

        return {
            'success': True,
            'server': self.server_id,
            'request_id': request_id,
            'processing_time': processing_time,
            'current_load': self.current_load
        }

    def get_metrics(self) -> Dict[str, Any]:
        with self.lock:
            avg_response_time = (
                statistics.mean(self.response_times) if self.response_times else 0
            )
            error_rate = (
                self.error_count / self.request_count if self.request_count > 0 else 0
            )
            return {
                'server_id': self.server_id,
                'current_load': self.current_load,
                'capacity': self.capacity,
                'utilization': (self.current_load / self.capacity) * 100,
                'total_requests': self.request_count,
                'error_count': self.error_count,
                'error_rate': error_rate,
                'avg_response_time': avg_response_time,
                'is_healthy': self.is_healthy
            }

    def health_check(self) -> bool:
        # 2% chance of health failure
        self.is_healthy = random.random() >= 0.02
        return self.is_healthy


# ================= LOAD BALANCING STRATEGIES =================
class LoadBalancingStrategy(ABC):
    @abstractmethod
    def select_server(self, servers: List[Server]) -> Server:
        pass


class RoundRobinStrategy(LoadBalancingStrategy):
    def __init__(self):
        self.current_index = 0
        self.lock = threading.Lock()

    def select_server(self, servers: List[Server]) -> Server:
        with self.lock:
            if not servers:
                raise ValueError("No servers available")
            server = servers[self.current_index]
            self.current_index = (self.current_index + 1) % len(servers)
            return server


class LeastConnectionsStrategy(LoadBalancingStrategy):
    def select_server(self, servers: List[Server]) -> Server:
        if not servers:
            raise ValueError("No servers available")

        healthy_servers = [s for s in servers if s.is_healthy]
        if not healthy_servers:
            raise ValueError("No healthy servers available")

        return min(healthy_servers, key=lambda s: s.current_load)


class WeightedRoundRobinStrategy(LoadBalancingStrategy):
    def __init__(self, weights: Dict[str, int]):
        self.weights = weights
        self.current_weights = weights.copy()
        self.lock = threading.Lock()

    def select_server(self, servers: List[Server]) -> Server:
        with self.lock:
            if not servers:
                raise ValueError("No servers available")

            # Filter healthy servers with positive weight
            available_servers = [
                s for s in servers
                if s.is_healthy and self.current_weights.get(s.server_id, 0) > 0
            ]

            if not available_servers:
                # Reset weights
                self.current_weights = self.weights.copy()
                available_servers = [
                    s for s in servers
                    if s.is_healthy and self.current_weights.get(s.server_id, 0) > 0
                ]

            if not available_servers:
                raise ValueError("No healthy servers available")

            # Select server with highest current weight
            selected_server = max(
                available_servers,
                key=lambda s: self.current_weights[s.server_id]
            )
            self.current_weights[selected_server.server_id] -= 1
            return selected_server


class RandomStrategy(LoadBalancingStrategy):
    def select_server(self, servers: List[Server]) -> Server:
        if not servers:
            raise ValueError("No servers available")

        healthy_servers = [s for s in servers if s.is_healthy]
        if not healthy_servers:
            raise ValueError("No healthy servers available")

        return random.choice(healthy_servers)


# ================= LOAD BALANCER =================
class LoadBalancer:
    def __init__(self, strategy: LoadBalancingStrategy):
        self.servers: List[Server] = []
        self.strategy = strategy
        self.metrics = defaultdict(list)
        self.health_check_interval = 30
        self.is_running = False
        self.health_check_thread = None

    def add_server(self, server: Server):
        self.servers.append(server)

    def remove_server(self, server_id: str):
        self.servers = [s for s in self.servers if s.server_id != server_id]

    def process_request(self, request_id: str) -> Dict[str, Any]:
        try:
            selected_server = self.strategy.select_server(self.servers)
            response = selected_server.process_request(request_id)
            self.metrics['requests_processed'].append(time.time())
            return response
        except ValueError as e:
            return {'error': str(e), 'request_id': request_id}

    def start_health_checks(self):
        self.is_running = True
        self.health_check_thread = threading.Thread(target=self._health_check_loop)
        self.health_check_thread.daemon = True
        self.health_check_thread.start()

    def stop_health_checks(self):
        self.is_running = False
        if self.health_check_thread:
            self.health_check_thread.join()

    def _health_check_loop(self):
        while self.is_running:
            for server in self.servers:
                server.health_check()
            time.sleep(self.health_check_interval)

    def get_load_balancer_metrics(self) -> Dict[str, Any]:
        server_metrics = [server.get_metrics() for server in self.servers]
        total_requests = sum(m['total_requests'] for m in server_metrics)
        total_errors = sum(m['error_count'] for m in server_metrics)
        overall_error_rate = total_errors / total_requests if total_requests > 0 else 0

        return {
            'total_servers': len(self.servers),
            'healthy_servers': len([s for s in self.servers if s.is_healthy]),
            'total_requests_processed': total_requests,
            'overall_error_rate': overall_error_rate,
            'server_metrics': server_metrics,
            'strategy': type(self.strategy).__name__
        }


# ================= SIMULATION =================
def simulate_requests(load_balancer: LoadBalancer, num_requests: int):
    def make_request(req_id):
        response = load_balancer.process_request(f"req_{req_id}")
        if 'error' in response:
            print(f"Request {req_id} failed: {response['error']}")
        else:
            print(f"Request {req_id} → {response['server']} (load: {response['current_load']})")

    threads = []
    for i in range(num_requests):
        thread = threading.Thread(target=make_request, args=(i,))
        threads.append(thread)
        thread.start()
        time.sleep(random.uniform(0.05, 0.2))  # Stagger

    for thread in threads:
        thread.join()


# ================= DEMO =================
if __name__ == "__main__":
    # Create servers
    servers = [
        Server("server_1", capacity=50),
        Server("server_2", capacity=75),
        Server("server_3", capacity=100),
        Server("server_4", capacity=25)
    ]

    # Define strategies
    strategies = {
        "Round Robin": RoundRobinStrategy(),
        "Least Connections": LeastConnectionsStrategy(),
        "Weighted Round Robin": WeightedRoundRobinStrategy({
            "server_1": 3,
            "server_2": 2,
            "server_3": 1,
            "server_4": 1
        }),
        "Random": RandomStrategy()
    }

    for strategy_name, strategy in strategies.items():
        print(f"\n{'='*20} Testing {strategy_name} Strategy {'='*20}")

        # Reset servers
        for s in servers:
            s.current_load = 0
            s.request_count = 0
            s.error_count = 0
            s.response_times = []
            s.is_healthy = True

        # Setup load balancer
        lb = LoadBalancer(strategy)
        for server in servers:
            lb.add_server(server)

        # Start health checks
        lb.start_health_checks()

        # Simulate traffic
        simulate_requests(lb, 25)

        # Wait for processing
        time.sleep(3)

        # Print metrics
        metrics = lb.get_load_balancer_metrics()
        print(f"\n{strategy_name} Results:")
        print(f"  Healthy: {metrics['healthy_servers']}/{metrics['total_servers']}")
        print(f"  Total Requests: {metrics['total_requests_processed']}")
        print(f"  Error Rate: {metrics['overall_error_rate']:.2%}")

        for sm in metrics['server_metrics']:
            print(f"  {sm['server_id']}: "
                  f"{sm['current_load']}/{sm['capacity']} "
                  f"({sm['utilization']:.1f}%) "
                  f"Errors: {sm['error_count']}")

        lb.stop_health_checks()

    print("\nAll strategies tested!")


Request 0 → server_1 (load: 1)
Request 1 → server_2 (load: 0)
Request 4 → server_1 (load: 1)
Request 7 failed: Internal server error
Request 3 → server_4 (load: 2)
Request 5 → server_2 (load: 2)
Request 9 → server_2 (load: 1)
Request 2 failed: Internal server error
Request 6 → server_3 (load: 3)
Request 8 → server_1 (load: 1)
Request 13 → server_2 (load: 0)
Request 10 → server_3 (load: 2)
Request 16 → server_1 (load: 1)
Request 11 → server_4 (load: 2)
Request 12 → server_1 (load: 0)
Request 20 → server_1 (load: 0)
Request 18 → server_3 (load: 3)
Request 19 → server_4 (load: 3)
Request 14 → server_3 (load: 2)
Request 24 → server_1 (load: 0)
Request 17 → server_2 (load: 1)
Request 15 failed: Internal server error
Request 21 → server_2 (load: 0)
Request 22 → server_3 (load: 1)
Request 23 → server_4 (load: 2)

Round Robin Results:
  Healthy: 4/4
  Total Requests: 25
  Error Rate: 12.00%
  server_1: 0/50 (0.0%) Errors: 0
  server_2: 0/75 (0.0%) Errors: 0
  server_3: 1/100 (1.0%) Errors: 1


#Lab 10: Caching Strategies
Objective: Implement and compare different caching strategies.
Problem Statement: Create a caching system with LRU, LFU, and TTL eviction policies.


In [14]:
import time
import random
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from collections import OrderedDict, defaultdict
import threading


class Cache(ABC):
    @abstractmethod
    def get(self, key: str) -> Optional[Any]:
        pass

    @abstractmethod
    def put(self, key: str, value: Any, ttl: Optional[int] = None):
        pass

    @abstractmethod
    def delete(self, key: str) -> bool:
        pass

    @abstractmethod
    def clear(self):
        pass

    @abstractmethod
    def size(self) -> int:
        pass

    @abstractmethod
    def get_stats(self) -> Dict:
        pass


class LRUCache(Cache):
    """Least Recently Used cache implementation"""

    def __init__(self, capacity: int):
        self.capacity = capacity
        self.cache = OrderedDict()
        self.hits = 0
        self.misses = 0
        self.lock = threading.Lock()

    def get(self, key: str) -> Optional[Any]:
        with self.lock:
            if key not in self.cache:
                self.misses += 1
                return None

            # Move to end (most recently used)
            value = self.cache.pop(key)
            self.cache[key] = value
            self.hits += 1
            return value

    def put(self, key: str, value: Any, ttl: Optional[int] = None):
        with self.lock:
            if key in self.cache:
                # Remove existing key to update order
                self.cache.pop(key)
            elif len(self.cache) >= self.capacity:
                # Remove least recently used item
                self.cache.popitem(last=False)

            self.cache[key] = value

    def delete(self, key: str) -> bool:
        with self.lock:
            if key in self.cache:
                self.cache.pop(key)
                return True
            return False

    def clear(self):
        with self.lock:
            self.cache.clear()
            self.hits = 0
            self.misses = 0

    def size(self) -> int:
        return len(self.cache)

    def get_stats(self) -> Dict:
        total = self.hits + self.misses
        hit_ratio = self.hits / total if total > 0 else 0.0

        return {
            "type": "LRU",
            "capacity": self.capacity,
            "size": self.size(),
            "hits": self.hits,
            "misses": self.misses,
            "hit_ratio": hit_ratio,
            "items": list(self.cache.keys()),
        }


class LFUCache(Cache):
    """Least Frequently Used cache implementation"""

    def __init__(self, capacity: int):
        self.capacity = capacity
        # key -> (value, frequency)
        self.cache: Dict[str, tuple] = {}
        # frequency -> OrderedDict of keys (to preserve recency among same-frequency)
        self.frequency_dict: Dict[int, OrderedDict] = defaultdict(OrderedDict)
        self.min_frequency = 0
        self.hits = 0
        self.misses = 0
        self.lock = threading.Lock()

    def _remove_from_freq_bucket(self, key: str, frequency: int):
        bucket = self.frequency_dict.get(frequency)
        if bucket and key in bucket:
            del bucket[key]
            if not bucket:
                # keep the mapping but it will be empty dict; we may clean min_frequency elsewhere
                self.frequency_dict.pop(frequency, None)

    def get(self, key: str) -> Optional[Any]:
        with self.lock:
            if key not in self.cache:
                self.misses += 1
                return None

            value, frequency = self.cache[key]

            # Remove from current frequency bucket
            self._remove_from_freq_bucket(key, frequency)

            # Increase frequency
            new_frequency = frequency + 1
            self.cache[key] = (value, new_frequency)
            self.frequency_dict[new_frequency][key] = None

            # If the old frequency was the min and its bucket is now empty, update min_frequency
            if frequency == self.min_frequency and frequency not in self.frequency_dict:
                # find next min frequency
                if self.frequency_dict:
                    self.min_frequency = min(self.frequency_dict.keys())
                else:
                    self.min_frequency = new_frequency

            self.hits += 1
            return value

    def put(self, key: str, value: Any, ttl: Optional[int] = None):
        with self.lock:
            if self.capacity == 0:
                return

            if key in self.cache:
                # Update existing key: increase frequency and update value
                _, frequency = self.cache[key]
                self._remove_from_freq_bucket(key, frequency)
                new_frequency = frequency + 1
                self.cache[key] = (value, new_frequency)
                self.frequency_dict[new_frequency][key] = None

                if frequency == self.min_frequency and frequency not in self.frequency_dict:
                    if self.frequency_dict:
                        self.min_frequency = min(self.frequency_dict.keys())
                    else:
                        self.min_frequency = new_frequency
                return

            # If capacity reached, evict least frequently used (and least recently used among them)
            if len(self.cache) >= self.capacity:
                # Ensure min_frequency points to a non-empty bucket
                if self.min_frequency not in self.frequency_dict or not self.frequency_dict[self.min_frequency]:
                    # recalc min_frequency
                    if self.frequency_dict:
                        self.min_frequency = min(self.frequency_dict.keys())
                    else:
                        self.min_frequency = 0

                # pop the oldest key in min_frequency bucket
                lfu_bucket = self.frequency_dict.get(self.min_frequency)
                if lfu_bucket:
                    lfu_key, _ = lfu_bucket.popitem(last=False)
                    self.cache.pop(lfu_key, None)
                    if not lfu_bucket:
                        self.frequency_dict.pop(self.min_frequency, None)

            # Add new item with frequency 1
            self.cache[key] = (value, 1)
            self.frequency_dict[1][key] = None
            self.min_frequency = 1

    def delete(self, key: str) -> bool:
        with self.lock:
            if key not in self.cache:
                return False

            _, frequency = self.cache[key]
            del self.cache[key]
            self._remove_from_freq_bucket(key, frequency)

            # Update min_frequency if needed
            if frequency == self.min_frequency and self.min_frequency not in self.frequency_dict:
                if self.frequency_dict:
                    self.min_frequency = min(self.frequency_dict.keys())
                else:
                    self.min_frequency = 0

            return True

    def clear(self):
        with self.lock:
            self.cache.clear()
            self.frequency_dict.clear()
            self.min_frequency = 0
            self.hits = 0
            self.misses = 0

    def size(self) -> int:
        return len(self.cache)

    def get_stats(self) -> Dict:
        total = self.hits + self.misses
        hit_ratio = self.hits / total if total > 0 else 0.0

        frequency_distribution = {}
        for freq, keys in self.frequency_dict.items():
            frequency_distribution[freq] = len(keys)

        return {
            "type": "LFU",
            "capacity": self.capacity,
            "size": self.size(),
            "hits": self.hits,
            "misses": self.misses,
            "hit_ratio": hit_ratio,
            "min_frequency": self.min_frequency,
            "frequency_distribution": frequency_distribution,
            "items": list(self.cache.keys()),
        }


class TTLCache(Cache):
    """Time-to-Live cache implementation"""

    def __init__(self, capacity: int, default_ttl: int = 300):  # default 5 minutes
        self.capacity = capacity
        self.default_ttl = default_ttl
        # key -> (value, expiry_time)
        self.cache = OrderedDict()
        self.hits = 0
        self.misses = 0
        self.lock = threading.Lock()

    def _is_expired(self, key: str) -> bool:
        """Check if key has expired"""
        if key not in self.cache:
            return True

        _, expiry_time = self.cache[key]
        return time.time() > expiry_time

    def _cleanup_expired(self):
        """Remove expired entries"""
        current_time = time.time()
        expired_keys = [key for key, (_, expiry) in list(self.cache.items()) if expiry <= current_time]
        for key in expired_keys:
            del self.cache[key]

    def get(self, key: str) -> Optional[Any]:
        with self.lock:
            self._cleanup_expired()

            if key not in self.cache or self._is_expired(key):
                self.misses += 1
                if key in self.cache:
                    del self.cache[key]  # remove expired entry
                return None

            value, expiry = self.cache.pop(key)
            # Re-insert to update order (LRU behavior for non-expired items)
            self.cache[key] = (value, expiry)
            self.hits += 1
            return value

    def put(self, key: str, value: Any, ttl: Optional[int] = None):
        with self.lock:
            self._cleanup_expired()

            if ttl is None:
                ttl = self.default_ttl

            expiry_time = time.time() + ttl

            if key in self.cache:
                # Remove existing key to update order
                del self.cache[key]
            elif len(self.cache) >= self.capacity:
                # Remove least recently used non-expired item
                # ensure we pop a non-expired item; cleanup already done so pop is fine
                try:
                    self.cache.popitem(last=False)
                except KeyError:
                    # nothing to pop
                    pass

            self.cache[key] = (value, expiry_time)

    def delete(self, key: str) -> bool:
        with self.lock:
            if key in self.cache:
                del self.cache[key]
                return True
            return False

    def clear(self):
        with self.lock:
            self.cache.clear()
            self.hits = 0
            self.misses = 0

    def size(self) -> int:
        with self.lock:
            self._cleanup_expired()
            return len(self.cache)

    def get_expired_count(self) -> int:
        """Get count of expired but not yet cleaned up entries"""
        with self.lock:
            current_time = time.time()
            return sum(1 for _, expiry in self.cache.values() if expiry <= current_time)

    def get_stats(self) -> Dict:
        with self.lock:
            self._cleanup_expired()

            total = self.hits + self.misses
            hit_ratio = self.hits / total if total > 0 else 0.0

            # Calculate TTL distribution
            current_time = time.time()
            ttl_distribution = {"<1min": 0, "1-5min": 0, "5-15min": 0, ">15min": 0}

            for _, expiry in self.cache.values():
                ttl_remaining = expiry - current_time
                if ttl_remaining < 60:
                    ttl_distribution["<1min"] += 1
                elif ttl_remaining < 300:
                    ttl_distribution["1-5min"] += 1
                elif ttl_remaining < 900:
                    ttl_distribution["5-15min"] += 1
                else:
                    ttl_distribution[">15min"] += 1

            return {
                "type": "TTL",
                "capacity": self.capacity,
                "size": self.size(),
                "hits": self.hits,
                "misses": self.misses,
                "hit_ratio": hit_ratio,
                "default_ttl": self.default_ttl,
                "ttl_distribution": ttl_distribution,
                "expired_count": self.get_expired_count(),
                "items": list(self.cache.keys()),
            }


class CacheManager:
    """Manager to test and compare different caching strategies"""

    def __init__(self):
        self.caches: Dict[str, Cache] = {}

    def add_cache(self, name: str, cache: Cache):
        self.caches[name] = cache

    def benchmark_cache(self, cache: Cache, operations: int) -> Dict:
        """Benchmark cache performance with simulated workload"""
        # Warm up cache (if cache has capacity attribute)
        try:
            warm = min(operations // 2, cache.capacity)
        except Exception:
            warm = min(operations // 2, 100)

        for i in range(warm):
            # put supports ttl optional parameter, safe for all implementations
            cache.put(f"key_{i}", f"value_{i}")

        start_time = time.time()

        # Mixed workload: 70% reads, 20% writes, 10% deletes
        for i in range(operations):
            op_type = random.random()

            if op_type < 0.7:  # Read operation
                key = f"key_{random.randint(0, operations // 2)}"
                cache.get(key)
            elif op_type < 0.9:  # Write operation
                key = f"key_{random.randint(0, operations // 2)}"
                cache.put(key, f"value_{i}")
            else:  # Delete operation
                key = f"key_{random.randint(0, operations // 2)}"
                cache.delete(key)

        end_time = time.time()

        stats = cache.get_stats()
        total_time = end_time - start_time if end_time > start_time else 0.0
        stats["total_time"] = total_time
        stats["operations_per_second"] = operations / total_time if total_time > 0 else float("inf")

        return stats

    def compare_caches(self, operations: int = 1000):
        """Compare performance of all registered caches"""
        print("=== Cache Strategy Comparison ===")
        print(f"Operations: {operations}")
        print("-" * 60)

        results = []
        for name, cache in self.caches.items():
            print(f"Testing {name}...")

            # Clear cache before test
            cache.clear()

            # Run benchmark
            stats = self.benchmark_cache(cache, operations)
            results.append((name, stats))

            print(f"Hit Ratio: {stats['hit_ratio']:.3f}")
            print(f"Operations / sec: {stats['operations_per_second']:.2f}")
            print(f"Final Size: {stats['size']}/{stats['capacity']}")
            print()

        # Print comparison
        print("=== Results Summary ===")
        print(f"{'Cache Type':<15} {'Hit Ratio':<12} {'Ops / Sec':<12} {'Size':<10}")
        print("-" * 50)

        for name, stats in results:
            print(
                f"{name:<15} {stats['hit_ratio']:<12.3f} "
                f"{stats['operations_per_second']:<12.2f} "
                f"{stats['size']:<10}"
            )


# Demonstration
if __name__ == "__main__":
    # Create cache manager
    manager = CacheManager()

    # Add different cache strategies
    manager.add_cache("LRU", LRUCache(capacity=50))
    manager.add_cache("LFU", LFUCache(capacity=50))
    manager.add_cache("TTL", TTLCache(capacity=50, default_ttl=60))

    # Compare performance
    manager.compare_caches(operations=1000)

    # Demonstrate individual cache usage
    print("\n=== Individual Cache Demonstration ===")

    lru_cache = LRUCache(capacity=5)

    # Add some items
    for i in range(5):
        lru_cache.put(f"item_{i}", f"value_{i}")

    print("Initial cache state:")
    print(lru_cache.get_stats())

    # Access some items to change LRU order
    lru_cache.get("item_0")
    lru_cache.get("item_2")

    # Add new item (should evict least recently used)
    lru_cache.put("item_5", "value_5")

    print("\nAfter accessing items and adding new one:")
    print(lru_cache.get_stats())

    # TTL Cache demonstration
    print("\n=== TTL Cache Demonstration ===")
    ttl_cache = TTLCache(capacity=5, default_ttl=2)  # 2 second TTL

    ttl_cache.put("temp_1", "value_1")
    ttl_cache.put("temp_2", "value_2", ttl=5)  # Custom TTL

    print("Immediately after adding:")
    print(f"temp_1: {ttl_cache.get('temp_1')}")  # Should be there
    print(f"temp_2: {ttl_cache.get('temp_2')}")  # Should be there

    print("\nAfter 3 seconds:")
    time.sleep(3)
    print(f"temp_1: {ttl_cache.get('temp_1')}")  # Should be expired
    print(f"temp_2: {ttl_cache.get('temp_2')}")  # Should still be there

    print("\nTTL Cache stats:")
    print(ttl_cache.get_stats())


=== Cache Strategy Comparison ===
Operations: 1000
------------------------------------------------------------
Testing LRU...
Hit Ratio: 0.127
Operations / sec: 712347.83
Final Size: 50/50

Testing LFU...
Hit Ratio: 0.117
Operations / sec: 639180.74
Final Size: 50/50

Testing TTL...


KeyboardInterrupt: 

# 11: Database Sharding
Objective: Implement database sharding for horizontal scaling.
Problem Statement: Create a sharded database system that distributes data across multiple nodes

In [10]:
import hashlib
import threading
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any
from datetime import datetime
import json

class Shard(ABC):
    @abstractmethod
    def connect(self):
        pass

    @abstractmethod
    def insert(self, key: str, data: Dict) -> bool:
        pass

    @abstractmethod
    def get(self, key: str) -> Optional[Dict]:
        pass

    @abstractmethod
    def update(self, key: str, data: Dict) -> bool:
        pass

    @abstractmethod
    def delete(self, key: str) -> bool:
        pass

    @abstractmethod
    def get_stats(self) -> Dict:
        pass

class InMemoryShard(Shard):
    """In-memory implementation of a database shard"""

    def __init__(self, shard_id: str):
        self.shard_id = shard_id
        self.data = {}
        self.operation_count = 0
        self.lock = threading.Lock()

    def connect(self):
        print(f"Connected to shard {self.shard_id}")

    def insert(self, key: str, data: Dict) -> bool:
        with self.lock:
            if key in self.data:
                return False

            data['_shard_id'] = self.shard_id
            data['_created_at'] = datetime.now().isoformat()
            data['_updated_at'] = datetime.now().isoformat()

            self.data[key] = data
            self.operation_count += 1
            return True

    def get(self, key: str) -> Optional[Dict]:
        with self.lock:
            if key in self.data:
                self.operation_count += 1
                return self.data[key].copy()
            return None

    def update(self, key: str, data: Dict) -> bool:
        with self.lock:
            if key not in self.data:
                return False

            # Preserve metadata and update only provided fields
            existing_data = self.data[key].copy()
            existing_data.update(data)
            # Re-set metadata to avoid overwriting
            existing_data['_shard_id'] = self.data[key]['_shard_id']
            existing_data['_created_at'] = self.data[key]['_created_at']
            existing_data['_updated_at'] = datetime.now().isoformat()

            self.data[key] = existing_data
            self.operation_count += 1
            return True

    def delete(self, key: str) -> bool:
        with self.lock:
            if key in self.data:
                del self.data[key]
                self.operation_count += 1
                return True
            return False

    def get_stats(self) -> Dict:
        with self.lock:
            return {
                'shard_id': self.shard_id,
                'record_count': len(self.data),
                'operation_count': self.operation_count,
                'memory_usage_estimate': len(str(self.data))  # Rough estimate
            }

class ShardingStrategy(ABC):
    @abstractmethod
    def get_shard_id(self, key: str, total_shards: int) -> str:
        pass

class HashShardingStrategy(ShardingStrategy):
    """Hash-based sharding strategy"""

    def get_shard_id(self, key: str, total_shards: int) -> str:
        hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
        shard_index = hash_value % total_shards
        return f"shard_{shard_index}"

class RangeShardingStrategy(ShardingStrategy):
    """Range-based sharding strategy"""

    def __init__(self, ranges: List[tuple]):
        """
        ranges: List of tuples (start_key, end_key, shard_id)
        """
        self.ranges = sorted(ranges, key=lambda x: x[0])

    def get_shard_id(self, key: str, total_shards: int) -> str:
        # For simplicity, assume numeric keys for range sharding
        try:
            key_num = int(key)
            for start, end, shard_id in self.ranges:
                if start <= key_num <= end:
                    return shard_id
        except ValueError:
            # Fallback to hash if key is not numeric
            hash_strategy = HashShardingStrategy()
            return hash_strategy.get_shard_id(key, total_shards)

        # Default to first shard if no range matches (safe for empty ranges)
        return self.ranges[0][2] if self.ranges else "shard_0"

class DirectoryShardingStrategy(ShardingStrategy):
    """Directory-based sharding strategy"""

    def __init__(self, mapping: Dict[str, str]):
        self.mapping = mapping

    def get_shard_id(self, key: str, total_shards: int) -> str:
        return self.mapping.get(key, "shard_0")

class ShardedDatabase:
    """Main sharded database system"""

    def __init__(self, sharding_strategy: ShardingStrategy, num_shards: int = 4):
        self.sharding_strategy = sharding_strategy
        self.num_shards = num_shards
        self.shards: Dict[str, Shard] = {}
        self.setup_shards()
        self.lock = threading.Lock()

        # Statistics
        self.operations = {
            'insert': 0,
            'get': 0,
            'update': 0,
            'delete': 0,
            'errors': 0
        }

    def setup_shards(self):
        """Initialize all shards"""
        for i in range(self.num_shards):
            shard_id = f"shard_{i}"
            self.shards[shard_id] = InMemoryShard(shard_id)
            self.shards[shard_id].connect()

    def get_shard_for_key(self, key: str) -> Shard:
        """Determine which shard should handle the given key"""
        shard_id = self.sharding_strategy.get_shard_id(key, self.num_shards)
        return self.shards[shard_id]

    def insert(self, key: str, data: Dict) -> bool:
        """Insert data into appropriate shard"""
        try:
            shard = self.get_shard_for_key(key)
            success = shard.insert(key, data)

            with self.lock:
                self.operations['insert'] += 1
                if not success:
                    self.operations['errors'] += 1

            return success

        except Exception as e:
            with self.lock:
                self.operations['errors'] += 1
            print(f"Insert error: {e}")
            return False

    def get(self, key: str) -> Optional[Dict]:
        """Retrieve data from appropriate shard"""
        try:
            shard = self.get_shard_for_key(key)
            result = shard.get(key)

            with self.lock:
                self.operations['get'] += 1

            return result

        except Exception as e:
            with self.lock:
                self.operations['errors'] += 1
            print(f"Get error: {e}")
            return None

    def update(self, key: str, data: Dict) -> bool:
        """Update data in appropriate shard"""
        try:
            shard = self.get_shard_for_key(key)
            success = shard.update(key, data)

            with self.lock:
                self.operations['update'] += 1
                if not success:
                    self.operations['errors'] += 1

            return success

        except Exception as e:
            with self.lock:
                self.operations['errors'] += 1
            print(f"Update error: {e}")
            return False

    def delete(self, key: str) -> bool:
        """Delete data from appropriate shard"""
        try:
            shard = self.get_shard_for_key(key)
            success = shard.delete(key)

            with self.lock:
                self.operations['delete'] += 1
                if not success:
                    self.operations['errors'] += 1

            return success

        except Exception as e:
            with self.lock:
                self.operations['errors'] += 1
            print(f"Delete error: {e}")
            return False

    def get_database_stats(self) -> Dict:
        """Get comprehensive database statistics"""
        shard_stats = {}
        total_records = 0
        total_operations = 0

        for shard_id, shard in self.shards.items():
            stats = shard.get_stats()
            shard_stats[shard_id] = stats
            total_records += stats['record_count']
            total_operations += stats['operation_count']

        # Calculate load distribution
        load_distribution = {}
        for shard_id, stats in shard_stats.items():
            load_distribution[shard_id] = {
                'record_count': stats['record_count'],
                'percentage': (stats['record_count'] / total_records * 100)
                if total_records > 0 else 0
            }

        return {
            'sharding_strategy': type(self.sharding_strategy).__name__,
            'total_shards': self.num_shards,
            'total_records': total_records,
            'total_operations': total_operations,
            'operations': self.operations.copy(),
            'load_distribution': load_distribution,
            'shard_stats': shard_stats
        }

    def redistribute_data(self, new_num_shards: int, new_strategy: ShardingStrategy):
        """Redistribute data when changing sharding configuration"""
        print(f"Redistributing data from {self.num_shards} to {new_num_shards} shards...")

        # Collect all data
        all_data = {}
        for shard in self.shards.values():
            # This would need proper serialization in real implementation
            all_data.update(shard.data)

        # Update configuration
        self.num_shards = new_num_shards
        self.sharding_strategy = new_strategy

        # Clear existing shards
        self.shards.clear()
        self.setup_shards()

        # Reinsert all data
        for key, data in all_data.items():
            self.insert(key, data)

        print("Data redistribution completed!")

# Demonstration
def demonstrate_sharding():
    """Demonstrate different sharding strategies"""

    print("=== Hash-based Sharding ===")
    hash_strategy = HashShardingStrategy()
    hash_db = ShardedDatabase(hash_strategy, num_shards=4)

    # Insert sample data
    for i in range(100):
        hash_db.insert(f"user_{i}", {
            "name": f"User {i}",
            "email": f"user{i}@example.com",
            "age": 20 + (i % 40)
        })

    stats = hash_db.get_database_stats()
    print(f"Total records: {stats['total_records']}")
    print("Load distribution:")
    for shard_id, distribution in stats['load_distribution'].items():
        print(f"{shard_id}: {distribution['record_count']} records "
              f"({distribution['percentage']:.1f}%)")

    print("\n=== Range-based Sharding ===")
    range_strategy = RangeShardingStrategy([
        (0, 24, "shard_0"),
        (25, 49, "shard_1"),
        (50, 74, "shard_2"),
        (75, 99, "shard_3")
    ])
    range_db = ShardedDatabase(range_strategy, num_shards=4)

    # Insert sample data with numeric keys
    for i in range(100):
        range_db.insert(str(i), {
            "name": f"User {i}",
            "category": f"category_{i // 10}"
        })

    range_stats = range_db.get_database_stats()
    print("Load distribution:")
    for shard_id, distribution in range_stats['load_distribution'].items():
        print(f"{shard_id}: {distribution['record_count']} records "
              f"({distribution['percentage']:.1f}%)")

    print("\n=== Directory-based Sharding ===")
    # Create mapping for specific users to specific shards
    directory_mapping = {}
    for i in range(100):
        if i % 3 == 0:
            directory_mapping[f"user_{i}"] = "shard_0"
        elif i % 3 == 1:
            directory_mapping[f"user_{i}"] = "shard_1"
        else:
            directory_mapping[f"user_{i}"] = "shard_2"

    directory_strategy = DirectoryShardingStrategy(directory_mapping)
    directory_db = ShardedDatabase(directory_strategy, num_shards=3)

    for i in range(100):
        directory_db.insert(f"user_{i}", {
            "name": f"User {i}",
            "preferred_shard": directory_mapping.get(f"user_{i}", "unknown")
        })

    directory_stats = directory_db.get_database_stats()
    print("Load distribution:")
    for shard_id, distribution in directory_stats['load_distribution'].items():
        print(f"{shard_id}: {distribution['record_count']} records "
              f"({distribution['percentage']:.1f}%)")

    return hash_db, range_db, directory_db

def test_shard_operations(db: ShardedDatabase, db_name: str):
    """Test basic operations on a sharded database"""
    print(f"\n=== Testing {db_name} ===")

    # Test insert and get
    test_key = "test_user_123"
    test_data = {"name": "Test User", "email": "test@example.com"}

    print(f"Inserting {test_key}: {db.insert(test_key, test_data)}")
    print(f"Retrieving {test_key}: {db.get(test_key)}")

    # Test update
    updated_data = {"name": "Updated User", "email": "updated@example.com"}
    print(f"Updating {test_key}: {db.update(test_key, updated_data)}")
    print(f"Retrieving updated {test_key}: {db.get(test_key)}")

    # Test delete
    print(f"Deleting {test_key}: {db.delete(test_key)}")
    print(f"Retrieving deleted {test_key}: {db.get(test_key)}")

if __name__ == "__main__":
    # Demonstrate different sharding strategies
    hash_db, range_db, directory_db = demonstrate_sharding()

    # Test operations on each database
    test_shard_operations(hash_db, "Hash-sharded Database")
    test_shard_operations(range_db, "Range-sharded Database")
    test_shard_operations(directory_db, "Directory-sharded Database")

    # Show final statistics
    print("\n=== Final Statistics ===")
    for db, name in [(hash_db, "Hash"), (range_db, "Range"), (directory_db, "Directory")]:
        stats = db.get_database_stats()
        print(f"\n{name} Sharding:")
        print(f"Total Operations: {stats['total_operations']}")

=== Hash-based Sharding ===
Connected to shard shard_0
Connected to shard shard_1
Connected to shard shard_2
Connected to shard shard_3
Total records: 100
Load distribution:
shard_0: 28 records (28.0%)
shard_1: 24 records (24.0%)
shard_2: 29 records (29.0%)
shard_3: 19 records (19.0%)

=== Range-based Sharding ===
Connected to shard shard_0
Connected to shard shard_1
Connected to shard shard_2
Connected to shard shard_3
Load distribution:
shard_0: 25 records (25.0%)
shard_1: 25 records (25.0%)
shard_2: 25 records (25.0%)
shard_3: 25 records (25.0%)

=== Directory-based Sharding ===
Connected to shard shard_0
Connected to shard shard_1
Connected to shard shard_2
Load distribution:
shard_0: 34 records (34.0%)
shard_1: 33 records (33.0%)
shard_2: 33 records (33.0%)

=== Testing Hash-sharded Database ===
Inserting test_user_123: True
Retrieving test_user_123: {'name': 'Test User', 'email': 'test@example.com', '_shard_id': 'shard_0', '_created_at': '2025-11-19T06:49:22.448282', '_updated_at

##Lab 12: Message Queue System
Objective: Implement a basic message queue system with publishers and subscribers.
Problem Statement: Create a message queue that supports topics, persistence, and multiple consumers.

In [17]:
import threading
import time
import json
import uuid
from abc import ABC, abstractmethod
from typing import Dict, List, Callable, Optional
from datetime import datetime
from collections import defaultdict, deque
import heapq

class Message:
    def __init__(self, topic: str, payload: Dict, priority: int = 0):
        self.id = str(uuid.uuid4())
        self.topic = topic
        self.payload = payload
        self.priority = priority
        self.timestamp = datetime.now()
        self.delivery_attempts = 0

    def to_dict(self) -> Dict:
        return {
            'id': self.id,
            'topic': self.topic,
            'payload': self.payload,
            'priority': self.priority,
            'timestamp': self.timestamp.isoformat(),
            'delivery_attempts': self.delivery_attempts
        }

    def __lt__(self, other):
        # Higher priority messages come first
        return self.priority > other.priority

class MessageQueue(ABC):
    @abstractmethod
    def publish(self, topic: str, message: Dict, priority: int = 0) -> str:
        pass

    @abstractmethod
    def subscribe(self, topic: str, callback: Callable) -> str:
        pass

    @abstractmethod
    def unsubscribe(self, subscription_id: str):
        pass

    @abstractmethod
    def get_stats(self) -> Dict:
        pass

class InMemoryMessageQueue(MessageQueue):
    def __init__(self, max_queue_size: int = 10000):
        self.queues: Dict[str, List] = {}  # topic -> priority queue
        self.subscribers: Dict[str, List[tuple]] = defaultdict(list)  # topic -> [(sub_id, callback)]
        self.dead_letter_queues: Dict[str, List] = defaultdict(list)
        self.max_queue_size = max_queue_size
        self.lock = threading.Lock()

        # Statistics
        self.stats = {
            'messages_published': 0,
            'messages_delivered': 0,
            'messages_failed': 0,
            'subscriptions_created': 0,
            'subscriptions_removed': 0
        }

        # Start message processor
        self.is_running = True
        self.processor_thread = threading.Thread(target=self._process_messages)
        self.processor_thread.daemon = True
        self.processor_thread.start()

    def publish(self, topic: str, message: Dict, priority: int = 0) -> str:
        msg = Message(topic, message, priority)

        with self.lock:
            if topic not in self.queues:
                self.queues[topic] = []

            # Check queue size limit
            if len(self.queues[topic]) >= self.max_queue_size:
                # Remove lowest priority message
                if self.queues[topic]:
                    heapq.heappop(self.queues[topic])

            heapq.heappush(self.queues[topic], msg)
            self.stats['messages_published'] += 1

        return msg.id

    def subscribe(self, topic: str, callback: Callable) -> str:
        subscription_id = str(uuid.uuid4())

        with self.lock:
            self.subscribers[topic].append((subscription_id, callback))
            self.stats['subscriptions_created'] += 1

        return subscription_id

    def unsubscribe(self, subscription_id: str):
        with self.lock:
            for topic in self.subscribers:
                # Create a new list with only the subscriptions that don't match the ID
                self.subscribers[topic] = [
                    (sub_id, callback) for sub_id, callback in self.subscribers[topic]
                    if sub_id != subscription_id
                ]
            self.stats['subscriptions_removed'] += 1

    def _process_messages(self):
        """Background thread to process messages"""
        while self.is_running:
            with self.lock:
                # Process each topic
                for topic, queue in self.queues.items():
                    if queue and topic in self.subscribers:
                        # Get highest priority message
                        msg = heapq.heappop(queue)
                        subscribers = self.subscribers[topic]

                        # Deliver to all subscribers
                        for subscription_id, callback in subscribers:
                            try:
                                callback(msg.to_dict())
                                self.stats['messages_delivered'] += 1
                            except Exception as e:
                                print(f"Error delivering message {msg.id}: {e}")
                                msg.delivery_attempts += 1

                        # Move to dead letter queue after 3 attempts
                        if msg.delivery_attempts >= 3:
                            self.dead_letter_queues[topic].append(msg)
                            self.stats['messages_failed'] += 1
                        else:
                            # Requeue the message
                            heapq.heappush(queue, msg)

            time.sleep(0.1)  # Small delay to prevent busy waiting

    def get_queue_size(self, topic: str) -> int:
        with self.lock:
            return len(self.queues.get(topic, []))

    def get_subscriber_count(self, topic: str) -> int:
        with self.lock:
            return len(self.subscribers.get(topic, []))

    def get_dead_letter_count(self, topic: str) -> int:
        with self.lock:
            return len(self.dead_letter_queues.get(topic, []))

    def get_stats(self) -> Dict:
        with self.lock:
            queue_sizes = {topic: len(queue) for topic, queue in self.queues.items()}
            subscriber_counts = {topic: len(subs) for topic, subs in self.subscribers.items()}
            dead_letter_counts = {topic: len(dlq) for topic, dlq in self.dead_letter_queues.items()}

            return {
                'queues': queue_sizes,
                'subscribers': subscriber_counts,
                'dead_letter_queues': dead_letter_counts,
                'statistics': self.stats.copy()
            }

    def stop(self):
        self.is_running = False
        if self.processor_thread:
            self.processor_thread.join()

class PersistentMessageQueue(InMemoryMessageQueue):
    """Message queue with basic persistence to disk"""
    def __init__(self, max_queue_size: int = 10000, persistence_file: str = "mq_persistence.json"):
        self.persistence_file = persistence_file
        super().__init__(max_queue_size)
        self._load_persistence()

        # Start persistence thread
        self.persistence_thread = threading.Thread(target=self._persistence_loop)
        self.persistence_thread.daemon = True
        self.persistence_thread.start()

    def _load_persistence(self):
        """Load queue state from disk"""
        try:
            with open(self.persistence_file, 'r') as f:
                data = json.load(f)

            # Reconstruct queues (simplified - in reality would need proper serialization)
            print("Loaded persistent state from disk")
        except FileNotFoundError:
            print("No persistence file found, starting fresh")
        except Exception as e:
            print(f"Error loading persistence: {e}")

    def _save_persistence(self):
        """Save queue state to disk"""
        try:
            # Simplified persistence - in reality would need proper serialization
            persistence_data = {
                'stats': self.stats,
                'queues_count': {topic: len(queue) for topic, queue in self.queues.items()}
            }

            with open(self.persistence_file, 'w') as f:
                json.dump(persistence_data, f, indent=2)
        except Exception as e:
            print(f"Error saving persistence: {e}")

    def _persistence_loop(self):
        """Background thread to periodically save state"""
        while self.is_running:
            time.sleep(30)  # Save every 30 seconds
            self._save_persistence()

    def stop(self):
        super().stop()
        self._save_persistence()  # Final save

class TopicRouter:
    """Advanced topic routing with pattern matching"""
    def __init__(self):
        self.patterns = {}  # pattern -> handler

    def add_route(self, pattern: str, handler: Callable):
        """Add route with pattern (supports * wildcard)"""
        self.patterns[pattern] = handler

    def route_message(self, topic: str, message: Dict):
        """Route message to matching patterns"""
        matched = False
        for pattern, handler in self.patterns.items():
            if self._matches_pattern(topic, pattern):
                try:
                    handler(message)
                    matched = True
                except Exception as e:
                    print(f"Error in route handler for pattern {pattern}: {e}")

        if not matched:
            print(f"No route found for topic: {topic}")

    def _matches_pattern(self, topic: str, pattern: str) -> bool:
        """Check if topic matches pattern with wildcards"""
        topic_parts = topic.split('.')
        pattern_parts = pattern.split('.')

        if len(topic_parts) != len(pattern_parts):
            return False

        for t, p in zip(topic_parts, pattern_parts):
            if p != '*' and t != p:
                return False

        return True

# Example consumers and producers
class EmailService:
    def send_email(self, message: Dict):
        print(f"[Email] Sending email: {message['payload']['subject']}")
        # Simulate email sending
        time.sleep(0.1)
        print(f"[Email] Email sent to {message['payload']['to']}")

class NotificationService:
    def send_notification(self, message: Dict):
        print(f"[Notification] Sending push notification: {message['payload']['message']}")
        time.sleep(0.05)

class AnalyticsService:
    def track_event(self, message: Dict):
        print(f"[Analytics] Tracking event: {message['payload']['event_name']}")
        # Simulate analytics processing
        time.sleep(0.02)

class OrderService:
    def __init__(self, message_queue: MessageQueue):
        self.mq = message_queue
        self.setup_consumers()

    def setup_consumers(self):
        # Subscribe to order-related topics
        self.mq.subscribe("order.created", self.handle_order_created)
        self.mq.subscribe("order.updated", self.handle_order_updated)
        self.mq.subscribe("order.cancelled", self.handle_order_cancelled)

    def handle_order_created(self, message: Dict):
        order_data = message['payload']
        print(f"[OrderService] Processing new order: {order_data['order_id']}")

        # Publish related events
        self.mq.publish("email.order_confirmation", {
            'to': order_data['customer_email'],
            'subject': 'Order Confirmation',
            'order_id': order_data['order_id']
        })

        self.mq.publish("analytics.event", {
            'event_name': 'order_created',
            'order_id': order_data['order_id'],
            'amount': order_data['amount']
        })

    def handle_order_updated(self, message: Dict):
        order_data = message['payload']
        print(f"[OrderService] Order updated: {order_data['order_id']}")

    def handle_order_cancelled(self, message: Dict):
        order_data = message['payload']
        print(f"[OrderService] Order cancelled: {order_data['order_id']}")

# Demonstration
def demonstrate_message_queue():
    print("=== Message Queue System Demonstration === ")

    # Create message queue
    mq = InMemoryMessageQueue(max_queue_size=1000)

    # Create services
    email_service = EmailService()
    notification_service = NotificationService()
    analytics_service = AnalyticsService()
    order_service = OrderService(mq)

    # Set up additional consumers
    mq.subscribe("email.*", email_service.send_email)
    mq.subscribe("notification.*", notification_service.send_notification)
    mq.subscribe("analytics.*", analytics_service.track_event)

    # Create topic router for advanced routing
    router = TopicRouter()
    router.add_route("user.*", lambda msg: print(f"[Router] User event: {msg['topic']}"))
    router.add_route("*.created", lambda msg: print(f"[Router] Creation event: {msg['topic']}"))

    mq.subscribe("*", router.route_message)

    # Produce some messages
    print("=== Producing Messages === ")

    # High priority order
    mq.publish("order.created", {
        'order_id': 'ORD-001',
        'customer_email': 'customer@example.com',
        'amount': 99.99,
        'items': ['item1', 'item2']
    }, priority=10)

    # Regular priority notifications
    mq.publish("notification.welcome", {
        'user_id': 'user123',
        'message': 'Welcome to our service!'
    })

    mq.publish("user.created", {
        'user_id': 'user456',
        'name': 'John Doe'
    })

    mq.publish("analytics.page_view", {
        'page': '/home',
        'user_agent': 'Mozilla/5.0'
    })

    # Let messages process - increased sleep time to ensure processing
    print("Waiting for messages to be processed (3 seconds)...")
    time.sleep(3)  # Increased from 2 to 3 seconds to ensure processing

    # Show statistics
    print("=== Message Queue Statistics === ")
    stats = mq.get_stats()
    for category, data in stats.items():
        print(f"{category}:")
        if isinstance(data, dict):
            for key, value in data.items():
                print(f"  {key}: {value}")
        else:
            print(f"  {data}")

    # Cleanup
    mq.stop()

def performance_test():
    """Test message queue performance"""
    print("=== Performance Test === ")

    mq = InMemoryMessageQueue()
    message_count = 100

    # Simple consumer
    received_count = [0]  # Use list for mutable reference

    def simple_consumer(message):
        received_count[0] += 1

    mq.subscribe("test.topic", simple_consumer)

    # Produce messages
    start_time = time.time()

    for i in range(message_count):
        mq.publish("test.topic", {'message_id': i, 'data': 'x' * 100})

    # Wait for processing
    time.sleep(1)

    end_time = time.time()
    duration = end_time - start_time

    print(f"Produced and consumed {message_count} messages in {duration:.2f} seconds")
    print(f"Throughput: {message_count / duration:.2f} messages/second")
    print(f"Messages received: {received_count[0]}/{message_count}")

    mq.stop()

if __name__ == "__main__":
    demonstrate_message_queue()
    performance_test()

=== Message Queue System Demonstration === 
=== Producing Messages === 
Waiting for messages to be processed (3 seconds)...
[OrderService] Processing new order: ORD-001
=== Message Queue Statistics === 


KeyboardInterrupt: 

## Lab 13: ATAM Evaluation Framework
Objective: Implement Architecture Tradeoff Analysis Method (ATAM) evaluation.
Problem Statement: Create a framework to evaluate architectural decisions against quality attributes.

In [22]:
from abc import ABC, abstractmethod
from typing import Dict, List, Set, Tuple
from enum import Enum
import math

class QualityAttribute(Enum):
    PERFORMANCE = "Performance"
    SCALABILITY = "Scalability"
    RELIABILITY = "Reliability"
    SECURITY = "Security"
    MAINTAINABILITY = "Maintainability"
    AVAILABILITY = "Availability"
    USABILITY = "Usability"
    TESTABILITY = "Testability"
    DEPLOYABILITY = "Deployability"  # Added to resolve reference in examples

class ArchitecturalApproach:
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description
        self.quality_impacts: Dict[QualityAttribute, int] = {}  # -5 to +5 scale
        self.risks: List[str] = []
        self.sensitivity_points: List[str] = []
        self.tradeoff_points: List[str] = []

    def set_impact(self, attribute: QualityAttribute, impact: int):
        """Set impact on quality attribute (-5 to +5 scale)"""
        self.quality_impacts[attribute] = max(-5, min(5, impact))

    def add_risk(self, risk: str):
        self.risks.append(risk)

    def add_sensitivity_point(self, point: str):
        self.sensitivity_points.append(point)

    def add_tradeoff_point(self, point: str):
        self.tradeoff_points.append(point)

    def get_score(self) -> float:
        """Calculate overall score based on quality impacts"""
        if not self.quality_impacts:
            return 0.0

        # Weighted average of impacts
        total_weight = 0
        weighted_sum = 0

        for attribute, impact in self.quality_impacts.items():
            weight = self._get_attribute_weight(attribute)
            weighted_sum += impact * weight
            total_weight += weight

        return weighted_sum / total_weight if total_weight > 0 else 0.0

    def _get_attribute_weight(self, attribute: QualityAttribute) -> float:
        """Get weight for quality attribute (1.0 to 3.0 scale)"""
        weights = {
            QualityAttribute.PERFORMANCE: 2.0,
            QualityAttribute.SCALABILITY: 1.5,
            QualityAttribute.RELIABILITY: 2.5,
            QualityAttribute.SECURITY: 3.0,
            QualityAttribute.MAINTAINABILITY: 1.5,
            QualityAttribute.AVAILABILITY: 2.0,
            QualityAttribute.USABILITY: 1.0,
            QualityAttribute.TESTABILITY: 1.0,
            QualityAttribute.DEPLOYABILITY: 1.5  # Added to resolve reference in examples
        }
        return weights.get(attribute, 1.0)

class Scenario:
    def __init__(self, description: str, quality_attributes: List[QualityAttribute]):
        self.description = description
        self.quality_attributes = quality_attributes
        self.importance: int = 1  # 1-5 scale

    def set_importance(self, importance: int):
        self.importance = max(1, min(5, importance))

class ATAMEvaluator:
    def __init__(self):
        self.approaches: List[ArchitecturalApproach] = []
        self.scenarios: List[Scenario] = []
        self.stakeholders: List[str] = []
        self.quality_requirements: Dict[QualityAttribute, int] = {}  # 1-5 scale

    def add_approach(self, approach: ArchitecturalApproach):
        self.approaches.append(approach)

    def add_scenario(self, scenario: Scenario):
        self.scenarios.append(scenario)

    def add_stakeholder(self, stakeholder: str):
        self.stakeholders.append(stakeholder)

    def set_quality_requirement(self, attribute: QualityAttribute, importance: int):
        self.quality_requirements[attribute] = max(1, min(5, importance))

    def evaluate_approaches(self) -> List[Tuple[ArchitecturalApproach, float]]:
        """Evaluate all approaches and return sorted by score"""
        results = []

        for approach in self.approaches:
            score = self._calculate_approach_score(approach)
            results.append((approach, score))

        # Sort by score descending
        return sorted(results, key=lambda x: x[1], reverse=True)

    def _calculate_approach_score(self, approach: ArchitecturalApproach) -> float:
        """Calculate comprehensive score for an approach"""
        # Base score from quality impacts
        base_score = approach.get_score()

        # Adjust based on quality requirements alignment
        requirement_score = 0
        total_weight = 0

        for attribute, requirement_importance in self.quality_requirements.items():
            approach_impact = approach.quality_impacts.get(attribute, 0)

            # Normalize impact to 0-1 scale
            normalized_impact = (approach_impact + 5) / 10

            requirement_score += normalized_impact * requirement_importance
            total_weight += requirement_importance

        requirement_alignment = requirement_score / total_weight if total_weight > 0 else 0

        # Combine scores (60% base, 40% requirement alignment)
        final_score = (base_score * 0.6) + (requirement_alignment * 40)

        return final_score

    def identify_sensitivity_points(self) -> Dict[str, List[str]]:
        """Identify sensitivity points across all approaches"""
        sensitivity_points = {}

        for approach in self.approaches:
            for point in approach.sensitivity_points:
                if point not in sensitivity_points:
                    sensitivity_points[point] = []
                sensitivity_points[point].append(approach.name)

        return sensitivity_points

    def identify_tradeoff_points(self) -> Dict[str, List[Tuple[str, str]]]:
        """Identify tradeoff points between quality attributes"""
        tradeoffs = {}

        for approach in self.approaches:
            for point in approach.tradeoff_points:
                if point not in tradeoffs:
                    tradeoffs[point] = []

                # Find attributes with positive and negative impacts
                positive_attrs = []
                negative_attrs = []

                for attr, impact in approach.quality_impacts.items():
                    if impact > 0:
                        positive_attrs.append(attr.value)
                    elif impact < 0:
                        negative_attrs.append(attr.value)

                if positive_attrs and negative_attrs:
                    tradeoff_desc = f"Improves {', '.join(positive_attrs)} but degrades {', '.join(negative_attrs)}"
                    tradeoffs[point].append((approach.name, tradeoff_desc))

        return tradeoffs

    def generate_report(self) -> Dict:
        """Generate comprehensive ATAM report"""
        evaluated_approaches = self.evaluate_approaches()
        sensitivity_points = self.identify_sensitivity_points()
        tradeoff_points = self.identify_tradeoff_points()

        report = {
            'stakeholders': self.stakeholders,
            'quality_requirements': {attr.value: importance
                                    for attr, importance in self.quality_requirements.items()},
            'scenarios': [{'description': s.description, 'importance': s.importance}
                         for s in self.scenarios],
            'approach_rankings': [
                {
                    'approach': approach.name,
                    'score': round(score, 2),
                    'description': approach.description,
                    'risks': approach.risks
                }
                for approach, score in evaluated_approaches
            ],
            'sensitivity_points': sensitivity_points,
            'tradeoff_points': tradeoff_points,
            'recommendations': self._generate_recommendations(evaluated_approaches)
        }

        return report

    def _generate_recommendations(self, evaluated_approaches: List[Tuple[ArchitecturalApproach, float]]) -> List[str]:
        """Generate recommendations based on evaluation"""
        recommendations = []

        if not evaluated_approaches:
            return recommendations

        best_approach, best_score = evaluated_approaches[0]

        recommendations.append(f"Recommended approach: {best_approach.name} (Score: {best_score:.2f})")

        # Identify key strengths
        positive_impacts = []
        for attr, impact in best_approach.quality_impacts.items():
            if impact >= 3:  # Significant positive impact
                positive_impacts.append(f"{attr.value}(+{impact})")

        if positive_impacts:
            recommendations.append(f"Key strengths: {', '.join(positive_impacts)}")

        # Identify risks to mitigate
        if best_approach.risks:
            recommendations.append("Key risks to mitigate:")
            for risk in best_approach.risks[:3]:  # Top 3 risks
                recommendations.append(f" - {risk}")

        # Consider alternatives if scores are close
        if len(evaluated_approaches) > 1:
            second_approach, second_score = evaluated_approaches[1]
            score_difference = best_score - second_score

            if score_difference < 5:  # Close scores
                recommendations.append(f"Consider {second_approach.name} as alternative (Score: {second_score:.2f})")

        return recommendations

# Example usage and demonstration
def demonstrate_atam():
    print("=== ATAM Architecture Evaluation ===")

    # Create evaluator
    evaluator = ATAMEvaluator()

    # Add stakeholders
    evaluator.add_stakeholder("Product Owner")
    evaluator.add_stakeholder("Development Team")
    evaluator.add_stakeholder("Operations Team")
    evaluator.add_stakeholder("End Users")

    # Set quality requirements
    evaluator.set_quality_requirement(QualityAttribute.PERFORMANCE, 4)
    evaluator.set_quality_requirement(QualityAttribute.SCALABILITY, 3)
    evaluator.set_quality_requirement(QualityAttribute.RELIABILITY, 5)
    evaluator.set_quality_requirement(QualityAttribute.SECURITY, 4)
    evaluator.set_quality_requirement(QualityAttribute.MAINTAINABILITY, 3)

    # Add scenarios
    scenario1 = Scenario(
        "System should handle 10,000 concurrent users with response time < 2 seconds",
        [QualityAttribute.PERFORMANCE, QualityAttribute.SCALABILITY]
    )
    scenario1.set_importance(5)
    evaluator.add_scenario(scenario1)

    scenario2 = Scenario(
        "System should have 99.9% availability with automatic failover",
        [QualityAttribute.AVAILABILITY, QualityAttribute.RELIABILITY]
    )
    scenario2.set_importance(4)
    evaluator.add_scenario(scenario2)

    # Define architectural approaches

    # Approach 1: Microservices
    microservices = ArchitecturalApproach(
        "Microservices Architecture",
        "Decompose system into small, independently deployable services"
    )
    microservices.set_impact(QualityAttribute.SCALABILITY, 4)
    microservices.set_impact(QualityAttribute.MAINTAINABILITY, 3)
    microservices.set_impact(QualityAttribute.DEPLOYABILITY, 4)
    microservices.set_impact(QualityAttribute.PERFORMANCE, -1)  # Network overhead
    microservices.set_impact(QualityAttribute.RELIABILITY, 2)
    microservices.set_impact(QualityAttribute.SECURITY, -2)  # Increased attack surface
    microservices.add_risk("Distributed system complexity")
    microservices.add_risk("Data consistency challenges")
    microservices.add_sensitivity_point("Service communication latency")
    microservices.add_tradeoff_point("Independent deployment vs. operational complexity")

    # Approach 2: Monolithic
    monolithic = ArchitecturalApproach(
        "Monolithic Architecture",
        "Single unified codebase deployed as one unit"
    )
    monolithic.set_impact(QualityAttribute.PERFORMANCE, 3)  # No network calls
    monolithic.set_impact(QualityAttribute.SECURITY, 2)  # Smaller attack surface
    monolithic.set_impact(QualityAttribute.SCALABILITY, -2)  # Hard to scale components independently
    monolithic.set_impact(QualityAttribute.MAINTAINABILITY, -3)  # Codebase complexity
    monolithic.set_impact(QualityAttribute.RELIABILITY, 1)
    monolithic.set_impact(QualityAttribute.DEPLOYABILITY, -2)
    monolithic.add_risk("Single point of failure")
    monolithic.add_risk("Difficult to adopt new technologies")
    monolithic.add_sensitivity_point("Database performance")
    monolithic.add_tradeoff_point("Development speed vs. long-term maintainability")

    # Approach 3: Event-Driven
    event_driven = ArchitecturalApproach(
        "Event-Driven Architecture",
        "Components communicate through asynchronous events"
    )
    event_driven.set_impact(QualityAttribute.SCALABILITY, 5)
    event_driven.set_impact(QualityAttribute.RELIABILITY, 3)
    event_driven.set_impact(QualityAttribute.PERFORMANCE, 2)
    event_driven.set_impact(QualityAttribute.MAINTAINABILITY, 1)
    event_driven.set_impact(QualityAttribute.SECURITY, -1)
    event_driven.set_impact(QualityAttribute.TESTABILITY, -2)
    event_driven.add_risk("Event ordering and consistency")
    event_driven.add_risk("Debugging complexity")
    event_driven.add_sensitivity_point("Message broker performance")
    event_driven.add_tradeoff_point("Loose coupling vs. system understanding")

    # Add approaches to evaluator
    evaluator.add_approach(microservices)
    evaluator.add_approach(monolithic)
    evaluator.add_approach(event_driven)

    # Generate evaluation report
    report = evaluator.generate_report()

    # Print report
    print("\n=== ATAM Evaluation Report ===")

    print(f"\nStakeholders: {', '.join(report['stakeholders'])}")

    print("\nQuality Requirements:")
    for attr, importance in report['quality_requirements'].items():
        print(f"  {attr}: {importance}/5")

    print("\nApproach Rankings:")
    for i, ranking in enumerate(report['approach_rankings'], 1):
        print(f"{i}. {ranking['approach']} - Score: {ranking['score']}")
        print(f"   Description: {ranking['description']}")
        if ranking['risks']:
            print(f"   Key Risks: {', '.join(ranking['risks'][:2])}")

    print("\nSensitivity Points:")
    for point, approaches in report['sensitivity_points'].items():
        print(f"  {point}: {', '.join(approaches)}")

    print("\nTrade-off Points:")
    for point, tradeoffs in report['tradeoff_points'].items():
        print(f"  {point}:")
        for approach, description in tradeoffs:
            print(f"  - {approach}: {description}")

    print("\nRecommendations:")
    for recommendation in report['recommendations']:
        print(f"  {recommendation}")

    return evaluator

def detailed_approach_comparison(evaluator: ATAMEvaluator):
    """Show detailed comparison of approaches"""
    print("\n=== Detailed Approach Comparison ===")

    approaches = evaluator.evaluate_approaches()

    # Quality attributes for comparison
    attributes = list(QualityAttribute)

    # Print header
    header = f"{'Approach':<25} " + "".join(f"{attr.value[:10]:<12} " for attr in attributes) + " Overall"
    print(header)
    print("-" * len(header))

    # Print each approach
    for approach, overall_score in approaches:
        row = f"{approach.name:<25} "
        for attr in attributes:
            impact = approach.quality_impacts.get(attr, 0)
            impact_str = f"+{impact}" if impact > 0 else str(impact)
            row += f"{impact_str:<12}"
        row += f"{overall_score:.2f}"
        print(row)

if __name__ == "__main__":
    evaluator = demonstrate_atam()
    detailed_approach_comparison(evaluator)

=== ATAM Architecture Evaluation ===

=== ATAM Evaluation Report ===

Stakeholders: Product Owner, Development Team, Operations Team, End Users

Quality Requirements:
  Performance: 4/5
  Scalability: 3/5
  Reliability: 5/5
  Security: 4/5
  Maintainability: 3/5

Approach Rankings:
1. Event-Driven Architecture - Score: 28.6
   Description: Components communicate through asynchronous events
   Key Risks: Event ordering and consistency, Debugging complexity
2. Microservices Architecture - Score: 24.68
   Description: Decompose system into small, independently deployable services
   Key Risks: Distributed system complexity, Data consistency challenges
3. Monolithic Architecture - Score: 22.31
   Description: Single unified codebase deployed as one unit
   Key Risks: Single point of failure, Difficult to adopt new technologies

Sensitivity Points:
  Service communication latency: Microservices Architecture
  Database performance: Monolithic Architecture
  Message broker performance: Event-

## Lab 14: Architecture Documentation Generator
Objective: Create automated architecture documentation from code.
Problem Statement: Build a tool that analyzes Python code and generates architecture documentation

In [24]:
#!/usr/bin/env python3
import ast
import os
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from pathlib import Path
import json


@dataclass
class ClassInfo:
    name: str
    methods: List[str]
    attributes: List[str]
    base_classes: List[str]
    docstring: str
    file_path: str
    line_number: int


@dataclass
class FunctionInfo:
    name: str
    parameters: List[str]
    return_type: str
    docstring: str
    file_path: str
    line_number: int


@dataclass
class ModuleInfo:
    name: str
    classes: List[ClassInfo]
    functions: List[FunctionInfo]
    imports: List[str]
    file_path: str


@dataclass
class Dependency:
    source: str
    target: str
    type: str  # 'inheritance', 'composition', 'association', 'dependency'


class ArchitectureAnalyzer:
    def __init__(self):
        self.modules: Dict[str, ModuleInfo] = {}
        self.dependencies: List[Dependency] = []

    def analyze_directory(self, directory_path: str):
        """Analyze all Python files in directory (recursively)."""
        path = Path(directory_path)

        for py_file in path.rglob("*.py"):
            if self._should_analyze_file(py_file):
                self.analyze_file(str(py_file))

        self._build_dependencies()

    def _should_analyze_file(self, file_path: Path) -> bool:
        """Check if file should be analyzed (skip tests, venvs, caches)."""
        excluded_patterns = ["test_", "_test", "__pycache__", ".venv", "venv", "env", "/.venv/", "/venv/"]
        s = str(file_path)
        return not any(pattern in s for pattern in excluded_patterns)

    def analyze_file(self, file_path: str):
        """Analyze a single Python file and extract module info."""
        try:
            with open(file_path, "r", encoding="utf-8") as file:
                content = file.read()

            tree = ast.parse(content)
            module_name = Path(file_path).stem

            module_info = ModuleInfo(
                name=module_name,
                classes=[],
                functions=[],
                imports=[],
                file_path=file_path,
            )

            # Extract imports
            for node in ast.walk(tree):
                if isinstance(node, ast.Import):
                    for alias in node.names:
                        module_info.imports.append(alias.name)
                elif isinstance(node, ast.ImportFrom):
                    if node.module:
                        module_info.imports.append(node.module)

            # Extract top-level classes and functions
            for node in ast.iter_child_nodes(tree):
                if isinstance(node, ast.ClassDef):
                    class_info = self._extract_class_info(node, file_path)
                    module_info.classes.append(class_info)
                elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
                    function_info = self._extract_function_info(node, file_path)
                    module_info.functions.append(function_info)

            self.modules[module_name] = module_info

        except Exception as e:
            print(f"Error analyzing {file_path}: {e}")

    def _get_annotation_name(self, node: ast.AST) -> str:
        """Return a readable name for annotation nodes (simple best-effort)."""
        if node is None:
            return "Any"
        if isinstance(node, ast.Name):
            return node.id
        if isinstance(node, ast.Attribute):
            # e.g., typing.List -> Attribute(value=Name('typing'), attr='List')
            parts = []
            cur: Optional[ast.AST] = node
            while isinstance(cur, ast.Attribute):
                parts.append(cur.attr)
                cur = cur.value
            if isinstance(cur, ast.Name):
                parts.append(cur.id)
            return ".".join(reversed(parts))
        if isinstance(node, ast.Subscript):
            # e.g., List[int]
            value = self._get_annotation_name(node.value)
            # Try to get slice name
            try:
                slice_name = self._get_annotation_name(node.slice) if not isinstance(node.slice, ast.Tuple) else ",".join(
                    self._get_annotation_name(elt) for elt in node.slice.elts
                )
            except Exception:
                slice_name = "..."
            return f"{value}[{slice_name}]"
        if isinstance(node, ast.Constant):
            return repr(node.value)
        return "Any"

    def _extract_class_info(self, class_node: ast.ClassDef, file_path: str) -> ClassInfo:
        """Extract methods, attributes, base classes, docstring, etc."""
        methods: List[str] = []
        attributes: List[str] = []
        base_classes: List[str] = []

        # Base classes (handle Name and Attribute)
        for base in class_node.bases:
            if isinstance(base, ast.Name):
                base_classes.append(base.id)
            elif isinstance(base, ast.Attribute):
                base_classes.append(self._get_annotation_name(base))
            else:
                # Fallback for complex expressions
                base_classes.append(ast.unparse(base) if hasattr(ast, "unparse") else "UnknownBase")

        # Extract methods and class-level attributes (AnnAssign/Assign)
        for node in class_node.body:
            if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
                methods.append(node.name)
            elif isinstance(node, ast.Assign):
                for target in node.targets:
                    if isinstance(target, ast.Name):
                        attributes.append(target.id)
                    elif isinstance(target, ast.Attribute):
                        # e.g., some_module.CONST = ...
                        attributes.append(ast.unparse(target) if hasattr(ast, "unparse") else "attr")
            elif isinstance(node, ast.AnnAssign):
                target = node.target
                if isinstance(target, ast.Name):
                    attributes.append(target.id)

        docstring = ast.get_docstring(class_node) or ""

        return ClassInfo(
            name=class_node.name,
            methods=methods,
            attributes=attributes,
            base_classes=base_classes,
            docstring=docstring,
            file_path=file_path,
            line_number=class_node.lineno,
        )

    def _extract_function_info(self, function_node: ast.AST, file_path: str) -> FunctionInfo:
        """Extract function name, parameters, return type and docstring."""
        # function_node may be FunctionDef or AsyncFunctionDef
        name = getattr(function_node, "name", "<lambda>")
        parameters: List[str] = []

        # args: positional and kwonly; skip vararg/kwarg names for brevity
        args_obj = function_node.args
        for arg in getattr(args_obj, "args", []) + getattr(args_obj, "kwonlyargs", []):
            parameters.append(arg.arg)
        if getattr(args_obj, "vararg", None):
            parameters.append("*" + args_obj.vararg.arg)
        if getattr(args_obj, "kwarg", None):
            parameters.append("**" + args_obj.kwarg.arg)

        # Return type best-effort
        return_type = "Any"
        if getattr(function_node, "returns", None):
            try:
                return_type = self._get_annotation_name(function_node.returns)
            except Exception:
                return_type = "Any"

        docstring = ast.get_docstring(function_node) or ""
        lineno = getattr(function_node, "lineno", 0)

        return FunctionInfo(
            name=name,
            parameters=parameters,
            return_type=return_type,
            docstring=docstring,
            file_path=file_path,
            line_number=lineno,
        )

    def _build_dependencies(self):
        """Build dependency graph between modules and classes."""
        self.dependencies.clear()
        # Module import-based dependencies
        for module_name, module_info in self.modules.items():
            seen = set()
            for import_name in module_info.imports:
                imported_module = import_name.split(".")[0]
                if imported_module in self.modules and imported_module != module_name:
                    key = (module_name, imported_module, "dependency")
                    if key not in seen:
                        self.dependencies.append(
                            Dependency(source=module_name, target=imported_module, type="dependency")
                        )
                        seen.add(key)

        # Class inheritance relationships across modules
        for module_name, module_info in self.modules.items():
            for class_info in module_info.classes:
                for base_class in class_info.base_classes:
                    # Try to find which module defines this base class
                    for other_module_name, other_module in self.modules.items():
                        for other_class in other_module.classes:
                            if other_class.name == base_class:
                                self.dependencies.append(
                                    Dependency(
                                        source=f"{module_name}.{class_info.name}",
                                        target=f"{other_module_name}.{base_class}",
                                        type="inheritance",
                                    )
                                )

    def generate_class_diagram(self) -> str:
        """Generate PlantUML class diagram (text)."""
        plantuml = ["@startuml", "skinparam classAttributeIconSize 0"]

        # Add classes
        for module_name, module_info in self.modules.items():
            for class_info in module_info.classes:
                full_name = f"{module_name}.{class_info.name}"
                plantuml.append(f"class \"{full_name}\" as {module_name}_{class_info.name}")
                # attributes and methods (limited for readability)
                for attr in class_info.attributes[:10]:
                    plantuml.append(f"{module_name}_{class_info.name} : +{attr}")
                for method in class_info.methods[:10]:
                    plantuml.append(f"{module_name}_{class_info.name} : +{method}()")

        # Add relationships
        for dep in self.dependencies:
            if dep.type == "inheritance":
                # dep.target is like module.Class - convert to alias used above
                target_alias = dep.target.replace(".", "_")
                source_alias = dep.source.replace(".", "_")
                plantuml.append(f"{target_alias} <|-- {source_alias}")
            elif dep.type == "dependency":
                plantuml.append(f"{dep.source} ..> {dep.target}")

        plantuml.append("@enduml")
        return "\n".join(plantuml)

    def generate_component_diagram(self) -> str:
        """Generate PlantUML component diagram (modules as components)."""
        plantuml = ["@startuml", "skinparam componentStyle uml2"]

        # Components (modules)
        for module_name in self.modules.keys():
            plantuml.append(f"component \"{module_name}\" as {module_name}")

        # Module-level dependencies
        module_deps: Dict[(str, str), int] = {}
        for dep in self.dependencies:
            if dep.type == "dependency":
                key = (dep.source, dep.target)
                module_deps[key] = module_deps.get(key, 0) + 1

        for (source, target), count in module_deps.items():
            plantuml.append(f"{source} --> {target}")

        plantuml.append("@enduml")
        return "\n".join(plantuml)

    def generate_architecture_report(self) -> Dict[str, Any]:
        """Generate a dictionary report of the architecture."""
        report: Dict[str, Any] = {
            "summary": {
                "total_modules": len(self.modules),
                "total_classes": sum(len(m.classes) for m in self.modules.values()),
                "total_functions": sum(len(m.functions) for m in self.modules.values()),
                "total_dependencies": len(self.dependencies),
            },
            "modules": {},
            "dependencies": [],
            "metrics": self._calculate_metrics(),
        }

        for module_name, module_info in self.modules.items():
            report["modules"][module_name] = {
                "file_path": module_info.file_path,
                "class_count": len(module_info.classes),
                "function_count": len(module_info.functions),
                "import_count": len(module_info.imports),
                "classes": [
                    {
                        "name": cls.name,
                        "method_count": len(cls.methods),
                        "attribute_count": len(cls.attributes),
                        "base_classes": cls.base_classes,
                        "has_docstring": bool(cls.docstring and cls.docstring.strip()),
                    }
                    for cls in module_info.classes
                ],
            }

        for dep in self.dependencies:
            report["dependencies"].append({"source": dep.source, "target": dep.target, "type": dep.type})

        return report

    def _calculate_metrics(self) -> Dict[str, float]:
        """Calculate some simple architecture metrics."""
        metrics: Dict[str, float] = {}
        if not self.modules:
            return metrics

        # Methods count across all classes
        total_methods = sum(len(cls.methods) for m in self.modules.values() for cls in m.classes)
        total_functions = sum(len(m.functions) for m in self.modules.values())

        metrics["methods_per_module"] = total_methods / len(self.modules) if self.modules else 0.0
        metrics["functions_per_module"] = total_functions / len(self.modules) if self.modules else 0.0

        # Coupling (average outgoing deps per module)
        outgoing_deps: Dict[str, int] = {}
        incoming_deps: Dict[str, int] = {}
        for dep in self.dependencies:
            outgoing_deps[dep.source] = outgoing_deps.get(dep.source, 0) + 1
            incoming_deps[dep.target] = incoming_deps.get(dep.target, 0) + 1

        if outgoing_deps:
            metrics["average_coupling"] = sum(outgoing_deps.values()) / len(outgoing_deps)
        else:
            metrics["average_coupling"] = 0.0

        metrics["module_cohesion"] = self._calculate_cohesion()

        return metrics

    def _calculate_cohesion(self) -> float:
        """Simplified cohesion: for modules with >1 class, fraction of shared method names."""
        total_cohesion = 0.0
        modules_count = 0

        for module_info in self.modules.values():
            if len(module_info.classes) > 1:
                shared_methods = set()
                all_methods = set()
                for cls in module_info.classes:
                    class_methods = set(cls.methods)
                    shared_methods.update(all_methods.intersection(class_methods))
                    all_methods.update(class_methods)
                if all_methods:
                    module_cohesion = len(shared_methods) / len(all_methods)
                    total_cohesion += module_cohesion
                    modules_count += 1

        return total_cohesion / modules_count if modules_count > 0 else 1.0


class DocumentationGenerator:
    def __init__(self, analyzer: ArchitectureAnalyzer):
        self.analyzer = analyzer

    def generate_markdown_documentation(self, output_path: str):
        """Generate Markdown architecture documentation from the analyzer report."""
        report = self.analyzer.generate_architecture_report()

        with open(output_path, "w", encoding="utf-8") as f:
            f.write("# Architecture Documentation\n\n")

            # Summary
            f.write("## Summary\n\n")
            f.write(f"- **Total Modules**: {report['summary']['total_modules']}\n")
            f.write(f"- **Total Classes**: {report['summary']['total_classes']}\n")
            f.write(f"- **Total Functions**: {report['summary']['total_functions']}\n")
            f.write(f"- **Total Dependencies**: {report['summary']['total_dependencies']}\n\n")

            # Metrics
            f.write("## Architecture Metrics\n\n")
            for metric, value in report.get("metrics", {}).items():
                if isinstance(value, float):
                    f.write(f"- **{metric.replace('_', ' ').title()}**: {value:.2f}\n")
                else:
                    f.write(f"- **{metric.replace('_', ' ').title()}**: {value}\n")
            f.write("\n")

            # Modules
            f.write("## Modules\n\n")
            for module_name, module_data in report["modules"].items():
                f.write(f"### {module_name}\n\n")
                f.write(f"- **File**: {module_data['file_path']}\n")
                f.write(f"- **Classes**: {module_data['class_count']}\n")
                f.write(f"- **Functions**: {module_data['function_count']}\n")
                f.write(f"- **Imports**: {module_data['import_count']}\n\n")

                if module_data["classes"]:
                    f.write("#### Classes\n\n")
                    for cls in module_data["classes"]:
                        f.write(f"- **{cls['name']}**\n")
                        f.write(f"  - Methods: {cls['method_count']}\n")
                        f.write(f"  - Attributes: {cls['attribute_count']}\n")
                        base_str = ", ".join(cls["base_classes"]) if cls["base_classes"] else "None"
                        f.write(f"  - Base Classes: {base_str}\n")
                        f.write(f"  - Documented: {'Yes' if cls['has_docstring'] else 'No'}\n\n")

            # Dependencies
            f.write("## Dependencies\n\n")
            f.write("| Source | Target | Type |\n")
            f.write("|---|---|---|\n")
            for dep in report["dependencies"][:1000]:
                f.write(f"| {dep['source']} | {dep['target']} | {dep['type']} |\n")

        print(f"Markdown documentation generated: {output_path}")

    def generate_plantuml_diagrams(self, output_dir: str):
        """Write PlantUML diagram files for class/component diagrams."""
        os.makedirs(output_dir, exist_ok=True)

        class_diagram = self.analyzer.generate_class_diagram()
        with open(os.path.join(output_dir, "class_diagram.puml"), "w", encoding="utf-8") as f:
            f.write(class_diagram)

        component_diagram = self.analyzer.generate_component_diagram()
        with open(os.path.join(output_dir, "component_diagram.puml"), "w", encoding="utf-8") as f:
            f.write(component_diagram)

        print(f"PlantUML diagrams generated in: {output_dir}")


def demonstrate_analyzer():
    """Demonstration entrypoint for the analyzer."""
    print("=== Architecture Documentation Generator ===")

    analyzer = ArchitectureAnalyzer()

    # Try to use the script directory; fall back to cwd
    try:
        current_dir = str(Path(__file__).parent)
    except NameError:
        current_dir = os.getcwd()

    print(f"Analyzing directory: {current_dir}")
    analyzer.analyze_directory(current_dir)

    doc_generator = DocumentationGenerator(analyzer)

    # Generate Markdown documentation and diagrams
    output_md = os.path.join(current_dir, "architecture_documentation.md")
    diagrams_dir = os.path.join(current_dir, "diagrams")

    doc_generator.generate_markdown_documentation(output_md)
    doc_generator.generate_plantuml_diagrams(diagrams_dir)

    # Print summary
    report = analyzer.generate_architecture_report()
    print("\n=== Analysis Summary ===")
    for key, value in report["summary"].items():
        print(f"{key.replace('_', ' ').title()}: {value}")

    print("\n=== Architecture Metrics ===")
    for metric, value in report["metrics"].items():
        if isinstance(value, float):
            print(f"{metric.replace('_', ' ').title()}: {value:.2f}")
        else:
            print(f"{metric.replace('_', ' ').title()}: {value}")

    print("\n=== Top Module Dependencies ===")
    dependency_count: Dict[(str, str), int] = {}
    for dep in report["dependencies"]:
        if dep["type"] == "dependency":
            key = (dep["source"], dep["target"])
            dependency_count[key] = dependency_count.get(key, 0) + 1

    sorted_deps = sorted(dependency_count.items(), key=lambda x: x[1], reverse=True)[:10]
    for (source, target), count in sorted_deps:
        print(f"{source} -> {target} ({count} dependencies)")


if __name__ == "__main__":
    demonstrate_analyzer()


=== Architecture Documentation Generator ===
Analyzing directory: /content
Markdown documentation generated: /content/architecture_documentation.md
PlantUML diagrams generated in: /content/diagrams

=== Analysis Summary ===
Total Modules: 0
Total Classes: 0
Total Functions: 0
Total Dependencies: 0

=== Architecture Metrics ===

=== Top Module Dependencies ===


## Lab 15: Cloud-Native Patterns
Objective: Implement cloud-native patterns including circuit breaker and retry mechanisms.
Problem Statement: Create resilient microservices with circuit breakers, retries, and fallbacks.


In [None]:
import time
import random
from abc import ABC, abstractmethod
from typing import Any, Callable, Optional, Dict
from enum import Enum
import threading
from dataclasses import dataclass
from datetime import datetime, timedelta


class CircuitState(Enum):
    CLOSED = "CLOSED"       # Normal operation
    OPEN = "OPEN"           # Fail fast, no requests
    HALF_OPEN = "HALF_OPEN" # Testing if service recovered


@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5          # Failures before opening circuit
    success_threshold: int = 3          # Successes before closing circuit
    timeout_seconds: int = 60           # Time circuit stays open
    half_open_max_requests: int = 3     # Max concurrent requests in half-open state


class CircuitBreakerError(Exception):
    pass


class CircuitBreaker:
    def __init__(self, name: str, config: CircuitBreakerConfig = None):
        self.name = name
        self.config = config or CircuitBreakerConfig()
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time: Optional[datetime] = None
        self.last_state_change = datetime.now()
        self.lock = threading.Lock()

        # Track concurrent requests when in HALF_OPEN
        self.half_open_current_requests = 0

    def execute(self, func: Callable, *args, **kwargs) -> Any:
        # First check and update state under lock
        with self.lock:
            # If OPEN, check timeout
            if self.state == CircuitState.OPEN:
                elapsed = (datetime.now() - self.last_state_change).total_seconds()
                if elapsed >= self.config.timeout_seconds:
                    self._transition_to_half_open()
                else:
                    raise CircuitBreakerError(f"Circuit breaker '{self.name}' is OPEN")

            # If HALF_OPEN, enforce concurrent request limit
            if self.state == CircuitState.HALF_OPEN:
                if self.half_open_current_requests >= self.config.half_open_max_requests:
                    raise CircuitBreakerError(
                        f"Circuit breaker '{self.name}' is in HALF_OPEN and at request limit"
                    )
                # reserve a slot for this request
                self.half_open_current_requests += 1

        # Execute the function *without* holding the main lock (to avoid blocking)
        try:
            result = func(*args, **kwargs)
        except Exception as e:
            # On failure: update state
            self._on_failure()
            raise CircuitBreakerError(f"Circuit breaker '{self.name}' caught exception: {e}") from e
        else:
            # On success: update state
            self._on_success()
            return result
        finally:
            # If we had reserved a half-open slot, release it
            with self.lock:
                if self.state == CircuitState.HALF_OPEN and self.half_open_current_requests > 0:
                    # If we transitioned away from HALF_OPEN in _on_success/_on_failure,
                    # the counter was reset in the transition methods. Only decrement if still in HALF_OPEN.
                    # But to be safe:
                    if self.half_open_current_requests > 0:
                        self.half_open_current_requests = max(0, self.half_open_current_requests - 1)

    def _on_success(self):
        with self.lock:
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                # If enough successes in half-open, close the circuit
                if self.success_count >= self.config.success_threshold:
                    self._transition_to_closed()
            elif self.state == CircuitState.CLOSED:
                # Reset failure_count on success during normal operation
                self.failure_count = 0
                self.success_count = 0

    def _on_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = datetime.now()

            if self.state == CircuitState.CLOSED and self.failure_count >= self.config.failure_threshold:
                self._transition_to_open()
            elif self.state == CircuitState.HALF_OPEN:
                # Any failure in half-open should re-open
                self._transition_to_open()

    def _transition_to_open(self):
        self.state = CircuitState.OPEN
        self.last_state_change = datetime.now()
        self.success_count = 0
        self.half_open_current_requests = 0
        print(f"Circuit breaker '{self.name}' transitioned to OPEN")

    def _transition_to_half_open(self):
        self.state = CircuitState.HALF_OPEN
        self.last_state_change = datetime.now()
        self.failure_count = 0
        self.success_count = 0
        self.half_open_current_requests = 0
        print(f"Circuit breaker '{self.name}' transitioned to HALF_OPEN")

    def _transition_to_closed(self):
        self.state = CircuitState.CLOSED
        self.last_state_change = datetime.now()
        self.failure_count = 0
        self.success_count = 0
        self.half_open_current_requests = 0
        print(f"Circuit breaker '{self.name}' transitioned to CLOSED")

    def get_status(self) -> Dict:
        with self.lock:
            return {
                "name": self.name,
                "state": self.state.value,
                "failure_count": self.failure_count,
                "success_count": self.success_count,
                "last_state_change": self.last_state_change.isoformat(),
                "is_closed": self.state == CircuitState.CLOSED,
                "is_open": self.state == CircuitState.OPEN,
                "is_half_open": self.state == CircuitState.HALF_OPEN,
                "half_open_current_requests": self.half_open_current_requests,
                "half_open_max_requests": self.config.half_open_max_requests,
            }


class RetryError(Exception):
    pass


class RetryStrategy:
    def __init__(self, max_attempts: int = 3, base_delay: float = 1.0, max_delay: float = 30.0, exponential_base: float = 2.0):
        self.max_attempts = max_attempts
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base

    def get_delay(self, attempt: int) -> float:
        """Calculate delay for retry attempt (attempt is 1-based)."""
        delay = self.base_delay * (self.exponential_base ** (attempt - 1))
        return min(delay, self.max_delay)


class RetryManager:
    def __init__(self, strategy: RetryStrategy = None):
        self.strategy = strategy or RetryStrategy()

    def execute_with_retry(self, func: Callable, *args, **kwargs) -> Any:
        last_exception = None

        for attempt in range(1, self.strategy.max_attempts + 1):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                last_exception = e
                print(f"Attempt {attempt} failed: {e}")

                if attempt == self.strategy.max_attempts:
                    break

                delay = self.strategy.get_delay(attempt)
                print(f"Retrying in {delay:.2f} seconds...")
                time.sleep(delay)

        raise RetryError(f"All {self.strategy.max_attempts} attempts failed") from last_exception


class FallbackStrategy:
    def __init__(self, fallback_func: Callable, fallback_on_exceptions: tuple = (Exception,)):
        self.fallback_func = fallback_func
        self.fallback_on_exceptions = fallback_on_exceptions

    def execute_with_fallback(self, func: Callable, *args, **kwargs) -> Any:
        try:
            return func(*args, **kwargs)
        except self.fallback_on_exceptions as e:
            print(f"Primary operation failed, using fallback: {e}")
            return self.fallback_func(*args, **kwargs)


class BulkheadError(Exception):
    pass


class BulkheadPattern:
    def __init__(self, max_concurrent: int = 10, name: str = "default"):
        self.name = name
        self.max_concurrent = max_concurrent
        self.current_concurrent = 0
        self.lock = threading.Lock()
        self.semaphore = threading.Semaphore(max_concurrent)

    def execute(self, func: Callable, *args, **kwargs) -> Any:
        if not self.semaphore.acquire(blocking=False):
            raise BulkheadError(f"Bulkhead '{self.name}' is at capacity")

        try:
            with self.lock:
                self.current_concurrent += 1
            return func(*args, **kwargs)
        finally:
            with self.lock:
                self.current_concurrent = max(0, self.current_concurrent - 1)
            self.semaphore.release()

    def get_status(self) -> Dict:
        with self.lock:
            return {
                "name": self.name,
                "max_concurrent": self.max_concurrent,
                "current_concurrent": self.current_concurrent,
                "available_capacity": self.max_concurrent - self.current_concurrent,
            }


class ServiceError(Exception):
    pass


class ResilientService:
    """Service with built-in resilience patterns"""

    def __init__(self, name: str):
        self.name = name

        # Initialize resilience patterns
        self.circuit_breaker = CircuitBreaker(f"{name}_cb")
        self.retry_manager = RetryManager()
        self.bulkhead = BulkheadPattern(max_concurrent=5, name=f"{name}_bh")

        # Statistics (not thread-safe counters for simplicity)
        self.request_count = 0
        self.success_count = 0
        self.failure_count = 0
        self._stats_lock = threading.Lock()

    def call_external_service(self, should_fail: bool = False) -> str:
        """Simulate calling an external service"""
        with self._stats_lock:
            self.request_count += 1

        # Simulate random failures (10% chance) or forced failure
        if should_fail or random.random() < 0.1:
            with self._stats_lock:
                self.failure_count += 1
            raise ServiceError(f"External service {self.name} failed")

        # Simulate processing time
        time.sleep(random.uniform(0.1, 0.5))

        with self._stats_lock:
            self.success_count += 1
        return f"Success from {self.name}"

    def fallback_method(self, *args, **kwargs) -> str:
        """Fallback method when primary service fails"""
        return f"Fallback response from {self.name}"

    def resilient_call(self, use_circuit_breaker: bool = True, use_retry: bool = True, use_bulkhead: bool = True, use_fallback: bool = True, should_fail: bool = False) -> str:
        """Make resilient call with configured patterns"""

        def primary_operation():
            return self.call_external_service(should_fail)

        operation = primary_operation

        # Apply fallback (wraps primary)
        if use_fallback:
            fallback_strategy = FallbackStrategy(self.fallback_method)
            prev_op = operation
            operation = lambda: fallback_strategy.execute_with_fallback(prev_op)

        # Apply bulkhead
        if use_bulkhead:
            prev_op = operation
            operation = lambda: self.bulkhead.execute(prev_op)

        # Apply retry
        if use_retry:
            prev_op = operation
            operation = lambda: self.retry_manager.execute_with_retry(prev_op)

        # Apply circuit breaker
        if use_circuit_breaker:
            prev_op = operation
            operation = lambda: self.circuit_breaker.execute(prev_op)

        return operation()

    def get_service_stats(self) -> Dict:
        """Get service statistics"""
        cb_status = self.circuit_breaker.get_status()
        bh_status = self.bulkhead.get_status()
        with self._stats_lock:
            total = self.request_count
            success = self.success_count
            failed = self.failure_count
            success_rate = (success / total * 100) if total > 0 else 0.0

        return {
            "service_name": self.name,
            "requests": {
                "total": total,
                "successful": success,
                "failed": failed,
                "success_rate": success_rate,
            },
            "circuit_breaker": cb_status,
            "bulkhead": bh_status,
        }


class CloudNativeOrchestrator:
    """Orchestrator for demonstrating cloud-native patterns"""

    def __init__(self):
        self.services: Dict[str, ResilientService] = {}
        self.setup_services()

    def setup_services(self):
        """Setup resilient services"""
        service_names = ["user-service", "order-service", "payment-service", "inventory-service"]
        for name in service_names:
            self.services[name] = ResilientService(name)

    def simulate_traffic(self, duration_seconds: int = 60, requests_per_second: int = 10):
        """Simulate traffic to all services"""
        print(f"=== Simulating Traffic for {duration_seconds} seconds ===")

        end_time = time.time() + duration_seconds
        request_count = 0

        while time.time() < end_time:
            threads = []

            # Create multiple concurrent requests
            for _ in range(requests_per_second):
                for service_name, service in self.services.items():
                    # Randomly decide if this request should fail
                    should_fail = random.random() < 0.15  # 15% forced failure rate

                    thread = threading.Thread(
                        target=self._make_resilient_call,
                        args=(service, should_fail),
                    )
                    threads.append(thread)
                    thread.start()
                    request_count += 1

            # Wait for all threads to complete
            for thread in threads:
                thread.join()

            time.sleep(1)  # Wait 1 second between batches

        print(f"Completed {request_count} requests")

    def _make_resilient_call(self, service: ResilientService, should_fail: bool):
        """Make a resilient call to a service"""
        try:
            result = service.resilient_call(
                use_circuit_breaker=True,
                use_retry=True,
                use_bulkhead=True,
                use_fallback=True,
                should_fail=should_fail,
            )
            print(result)
        except Exception as e:
            print(f"Service call failed: {e}")

    def display_dashboard(self):
        """Display current status dashboard"""
        print("\n=== Resilience Patterns Dashboard ===")
        for service_name, service in self.services.items():
            stats = service.get_service_stats()
            print(f"\n{service_name}:")
            print(f"  Requests: {stats['requests']['total']} ({stats['requests']['success_rate']:.1f}% success)")
            cb_state = stats["circuit_breaker"]["state"]
            print(f"  Circuit Breaker: {cb_state} (failures: {stats['circuit_breaker']['failure_count']})")
            bh_usage = stats["bulkhead"]["current_concurrent"]
            bh_max = stats["bulkhead"]["max_concurrent"]
            print(f"  Bulkhead: {bh_usage}/{bh_max} concurrent")


def demonstrate_resilience_patterns():
    """Demonstrate individual resilience patterns"""
    print("=== Individual Resilience Patterns Demo ===")

    service = ResilientService("demo-service")

    # Test circuit breaker by forcing failures
    print("\n1. Testing Circuit Breaker:")
    for i in range(10):
        try:
            result = service.resilient_call(
                use_circuit_breaker=True,
                use_retry=False,
                use_bulkhead=False,
                use_fallback=False,
                should_fail=True,  # Force failures
            )
            print(f"Request {i + 1}: {result}")
        except Exception as e:
            print(f"Request {i + 1}: {e}")

    # Wait for circuit to potentially half-open
    print("\nWaiting for circuit breaker timeout + 5s...")
    time.sleep(service.circuit_breaker.config.timeout_seconds + 5)

    # Test with recovery (no forced failures)
    print("\n2. Testing Recovery:")
    for i in range(5):
        try:
            result = service.resilient_call(
                use_circuit_breaker=True,
                use_retry=False,
                use_bulkhead=False,
                use_fallback=False,
                should_fail=False,  # No forced failures
            )
            print(f"Request {i + 1}: {result}")
        except Exception as e:
            print(f"Request {i + 1}: {e}")


if __name__ == "__main__":
    # Demonstrate individual patterns
    demonstrate_resilience_patterns()

    # Run orchestrated simulation
    orchestrator = CloudNativeOrchestrator()

    print("\n" + "=" * 50)
    orchestrator.display_dashboard()

    print("\nStarting traffic simulation ...")
    orchestrator.simulate_traffic(duration_seconds=10, requests_per_second=3)

    print("\nFinal Status:")
    orchestrator.display_dashboard()


=== Individual Resilience Patterns Demo ===

1. Testing Circuit Breaker:
Request 1: Circuit breaker 'demo-service_cb' caught exception: External service demo-service failed
Request 2: Circuit breaker 'demo-service_cb' caught exception: External service demo-service failed
Request 3: Circuit breaker 'demo-service_cb' caught exception: External service demo-service failed
Request 4: Circuit breaker 'demo-service_cb' caught exception: External service demo-service failed
Circuit breaker 'demo-service_cb' transitioned to OPEN
Request 5: Circuit breaker 'demo-service_cb' caught exception: External service demo-service failed
Request 6: Circuit breaker 'demo-service_cb' is OPEN
Request 7: Circuit breaker 'demo-service_cb' is OPEN
Request 8: Circuit breaker 'demo-service_cb' is OPEN
Request 9: Circuit breaker 'demo-service_cb' is OPEN
Request 10: Circuit breaker 'demo-service_cb' is OPEN

Waiting for circuit breaker timeout + 5s...

2. Testing Recovery:
Circuit breaker 'demo-service_cb' trans

 ## Lab 16: DevOps and CI/CD Pipeline Simulation
Objective: Simulate a complete CI/CD pipeline with automated testing and deployment.
Problem Statement: Create a pipeline simulator that handles code integration, testing, and deployment.

In [None]:
import time
import random
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Callable
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
import hashlib
import json
import statistics


class PipelineStage(Enum):
    SOURCE = "source"
    BUILD = "build"
    TEST = "test"
    SECURITY_SCAN = "security_scan"
    DEPLOY_STAGING = "deploy_staging"
    INTEGRATION_TEST = "integration_test"
    DEPLOY_PRODUCTION = "deploy_production"


class BuildStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    CANCELLED = "cancelled"


@dataclass
class CodeChange:
    commit_hash: str
    author: str
    message: str
    timestamp: datetime
    files_changed: List[str]
    lines_added: int
    lines_removed: int


@dataclass
class BuildResult:
    build_id: str
    status: BuildStatus
    stages: Dict[PipelineStage, BuildStatus]
    duration: float
    logs: List[str]
    artifacts: List[str]
    test_results: Optional[Dict[str, Any]] = None
    security_issues: Optional[List[Dict[str, Any]]] = None


class VersionControlSystem(ABC):
    @abstractmethod
    def get_latest_changes(self) -> List[CodeChange]:
        pass

    @abstractmethod
    def create_branch(self, name: str) -> bool:
        pass

    @abstractmethod
    def merge_branch(self, source: str, target: str) -> bool:
        pass


class GitSimulator(VersionControlSystem):
    def __init__(self):
        self.branches = ["main", "develop"]
        self.commits: List[CodeChange] = []
        self.current_branch = "main"

    def get_latest_changes(self) -> List[CodeChange]:
        # Simulate getting recent commits
        changes: List[CodeChange] = []
        for _ in range(random.randint(1, 3)):
            ch = CodeChange(
                commit_hash=hashlib.md5(str(time.time()).encode()).hexdigest()[:8],
                author=f"developer{random.randint(1, 5)}",
                message=self._generate_commit_message(),
                timestamp=datetime.now(),
                files_changed=[f"file{random.randint(1, 10)}.py"],
                lines_added=random.randint(1, 50),
                lines_removed=random.randint(0, 20),
            )
            changes.append(ch)
            self.commits.append(ch)
        return changes

    def create_branch(self, name: str) -> bool:
        if name not in self.branches:
            self.branches.append(name)
            return True
        return False

    def merge_branch(self, source: str, target: str) -> bool:
        if source in self.branches and target in self.branches:
            print(f"Merged branch '{source}' into '{target}'")
            return True
        return False

    def _generate_commit_message(self) -> str:
        messages = [
            "Fix bug in user authentication",
            "Add new API endpoint for orders",
            "Refactor database layer",
            "Update documentation",
            "Optimize performance",
            "Add unit tests",
            "Fix security vulnerability",
            "Update dependencies",
            "Improve error handling",
            "Add feature flag for new functionality",
        ]
        return random.choice(messages)


class TestRunner(ABC):
    @abstractmethod
    def run_unit_tests(self) -> Dict[str, Any]:
        pass

    @abstractmethod
    def run_integration_tests(self) -> Dict[str, Any]:
        pass

    @abstractmethod
    def run_performance_tests(self) -> Dict[str, Any]:
        pass


class PythonTestRunner(TestRunner):
    def run_unit_tests(self) -> Dict[str, Any]:
        # Simulate running unit tests
        time.sleep(random.uniform(0.1, 0.3))

        total_tests = random.randint(50, 200)
        failed_tests = random.randint(0, 5)
        passed_tests = total_tests - failed_tests

        return {
            "type": "unit",
            "total_tests": total_tests,
            "passed": passed_tests,
            "failed": failed_tests,
            "success_rate": (passed_tests / total_tests) * 100,
            "duration": random.uniform(1, 3),
            "failed_test_cases": [] if failed_tests == 0 else [f"test_user_authentication_{i}" for i in range(failed_tests)],
        }

    def run_integration_tests(self) -> Dict[str, Any]:
        time.sleep(random.uniform(0.1, 0.2))

        total_tests = random.randint(20, 50)
        failed_tests = random.randint(0, 3)
        passed_tests = total_tests - failed_tests

        return {
            "type": "integration",
            "total_tests": total_tests,
            "passed": passed_tests,
            "failed": failed_tests,
            "success_rate": (passed_tests / total_tests) * 100,
            "duration": random.uniform(2, 4),
            "failed_test_cases": [] if failed_tests == 0 else [f"test_order_workflow_{i}" for i in range(failed_tests)],
        }

    def run_performance_tests(self) -> Dict[str, Any]:
        time.sleep(random.uniform(0.1, 0.2))
        return {
            "type": "performance",
            "response_time_avg": random.uniform(100, 500),
            "throughput": random.randint(100, 1000),
            "error_rate": random.uniform(0, 2),
            "memory_usage": random.uniform(50, 200),
            "cpu_usage": random.uniform(10, 80),
        }


class SecurityScanner:
    def scan_code(self) -> List[Dict[str, Any]]:
        # Simulate security scanning
        time.sleep(random.uniform(0.1, 0.3))
        issues: List[Dict[str, Any]] = []
        vulnerability_types = [
            "SQL Injection",
            "XSS",
            "CSRF",
            "Insecure Deserialization",
            "Broken Authentication",
            "Sensitive Data Exposure",
        ]
        for _ in range(random.randint(0, 3)):
            issues.append({
                "type": random.choice(vulnerability_types),
                "severity": random.choice(["low", "medium", "high"]),
                "file": f"file{random.randint(1, 10)}.py",
                "line": random.randint(1, 100),
                "description": f"Potential {random.choice(vulnerability_types)} vulnerability",
            })
        return issues

    def scan_dependencies(self) -> List[Dict[str, Any]]:
        time.sleep(random.uniform(0.1, 0.2))
        issues: List[Dict[str, Any]] = []
        packages = ["requests", "flask", "django", "numpy", "pandas"]
        for _ in range(random.randint(0, 2)):
            issues.append({
                "package": random.choice(packages),
                "version": f"{random.randint(1,3)}.{random.randint(0,9)}.{random.randint(0,9)}",
                "vulnerability": "CVE-2023-" + str(random.randint(1000, 9999)),
                "severity": random.choice(["low", "medium", "high"]),
                "fixed_in": f"{random.randint(1,3)}.{random.randint(0,9)}.{random.randint(1,9)}",
            })
        return issues


class DeploymentManager:
    def __init__(self):
        self.environments = ["staging", "production"]
        self.deployment_history: List[Dict[str, Any]] = []

    def deploy_to_staging(self, build_artifact: str) -> bool:
        print("Deploying to staging environment ...")
        time.sleep(random.uniform(0.1, 0.3))
        success = random.random() > 0.1  # 90% success
        print("Staging deployment successful" if success else "Staging deployment failed")
        self.deployment_history.append({
            "environment": "staging",
            "artifact": build_artifact,
            "timestamp": datetime.now(),
            "success": success,
        })
        return success

    def deploy_to_production(self, build_artifact: str) -> bool:
        print("Deploying to production environment ...")
        time.sleep(random.uniform(0.1, 0.3))
        success = random.random() > 0.05  # 95% success
        print("Production deployment successful" if success else "Production deployment failed")
        self.deployment_history.append({
            "environment": "production",
            "artifact": build_artifact,
            "timestamp": datetime.now(),
            "success": success,
        })
        return success

    def rollback_production(self) -> bool:
        print("Rolling back production deployment ...")
        time.sleep(random.uniform(0.1, 0.2))
        success = random.random() > 0.02
        print("Production rollback successful" if success else "Production rollback failed")
        return success


class CICDPipeline:
    def __init__(self, name: str):
        self.name = name
        self.vcs = GitSimulator()
        self.test_runner = PythonTestRunner()
        self.security_scanner = SecurityScanner()
        self.deployment_manager = DeploymentManager()

        self.build_history: List[BuildResult] = []
        self.current_build: Optional[BuildResult] = None

        # Pipeline configuration
        self.auto_deploy_to_production = False
        self.required_success_rate = 95.0
        self.max_security_issues = 2

    def trigger_build(self, changes: List[CodeChange]) -> BuildResult:
        """Trigger a new build for the given changes"""
        build_id = f"build_{len(self.build_history) + 1:04d}"

        self.current_build = BuildResult(
            build_id=build_id,
            status=BuildStatus.RUNNING,
            stages={},
            duration=0.0,
            logs=[],
            artifacts=[],
            test_results=None,
            security_issues=[],
        )

        print(f"=== Starting Build {build_id} ===")
        start_time = time.time()

        try:
            # Execute pipeline stages
            self._execute_stage(PipelineStage.SOURCE, self._source_stage, changes)
            self._execute_stage(PipelineStage.BUILD, self._build_stage)
            self._execute_stage(PipelineStage.TEST, self._test_stage)
            self._execute_stage(PipelineStage.SECURITY_SCAN, self._security_scan_stage)

            # Only proceed if previous stages were successful
            if self.current_build.status == BuildStatus.SUCCESS:
                self._execute_stage(PipelineStage.DEPLOY_STAGING, self._deploy_staging_stage)
                self._execute_stage(PipelineStage.INTEGRATION_TEST, self._integration_test_stage)

            # Deploy to production if enabled and all checks pass
            if self.auto_deploy_to_production and self.current_build.status == BuildStatus.SUCCESS:
                self._execute_stage(PipelineStage.DEPLOY_PRODUCTION, self._deploy_production_stage)

        except Exception as e:
            self.current_build.status = BuildStatus.FAILED
            self.current_build.logs.append(f"Pipeline failed with error: {e}")

        # Calculate total duration
        self.current_build.duration = time.time() - start_time

        # Add to build history
        self.build_history.append(self.current_build)

        print(f"=== Build {build_id} completed with status: {self.current_build.status.value} ===")
        return self.current_build

    def _execute_stage(self, stage: PipelineStage, stage_func: Callable, *args):
        """Execute a pipeline stage and update build status"""
        if self.current_build.status != BuildStatus.RUNNING:
            return

        self.current_build.stages[stage] = BuildStatus.RUNNING
        self.current_build.logs.append(f"Starting {stage.value} stage...")

        try:
            result = stage_func(*args) if args else stage_func()
            if result:
                self.current_build.stages[stage] = BuildStatus.SUCCESS
                self.current_build.logs.append(f"{stage.value} stage completed successfully")
                # if all stages so far succeeded, keep build as success; otherwise remains running
                if all(s == BuildStatus.SUCCESS for s in self.current_build.stages.values()):
                    self.current_build.status = BuildStatus.SUCCESS
            else:
                self.current_build.stages[stage] = BuildStatus.FAILED
                self.current_build.status = BuildStatus.FAILED
                self.current_build.logs.append(f"{stage.value} stage failed")
        except Exception as e:
            self.current_build.stages[stage] = BuildStatus.FAILED
            self.current_build.status = BuildStatus.FAILED
            self.current_build.logs.append(f"{stage.value} stage failed with error: {e}")

    def _source_stage(self, changes: List[CodeChange]) -> bool:
        """Source stage: fetch and validate code changes"""
        time.sleep(0.05)
        self.current_build.logs.extend([
            f"Processing {len(changes)} commit(s)",
            f"Latest commit: {changes[0].message if changes else 'No changes'}",
        ])
        return True

    def _build_stage(self) -> bool:
        """Build stage: compile and package application"""
        time.sleep(random.uniform(0.05, 0.2))
        success = random.random() > 0.05  # 95% success rate

        if success:
            artifact = f"app_{self.current_build.build_id}.zip"
            self.current_build.artifacts.append(artifact)
            self.current_build.logs.append(f"Created build artifact: {artifact}")
        else:
            self.current_build.logs.append("Build failed: compilation errors")

        return success

    def _test_stage(self) -> bool:
        """Test stage: run automated tests"""
        test_results = self.test_runner.run_unit_tests()
        self.current_build.test_results = test_results

        self.current_build.logs.extend([
            f"Unit tests: {test_results['passed']}/{test_results['total_tests']} passed",
            f"Success rate: {test_results['success_rate']:.1f}%",
        ])

        # Check if success rate meets requirement
        if test_results['success_rate'] < self.required_success_rate:
            self.current_build.logs.append(
                f"Test success rate {test_results['success_rate']:.1f}% below required {self.required_success_rate}%"
            )
            return False

        return True

    def _security_scan_stage(self) -> bool:
        """Security scan stage: check for vulnerabilities"""
        code_issues = self.security_scanner.scan_code()
        dependency_issues = self.security_scanner.scan_dependencies()

        all_issues = code_issues + dependency_issues
        self.current_build.security_issues = all_issues

        high_severity_issues = [issue for issue in all_issues if issue.get("severity") == "high"]

        self.current_build.logs.append(
            f"Security scan: {len(all_issues)} issues found ({len(high_severity_issues)} high severity)"
        )

        # Fail if too many high severity security issues
        if len(high_severity_issues) > self.max_security_issues:
            self.current_build.logs.append(f"Too many high severity security issues: {len(high_severity_issues)}")
            return False

        return True

    def _deploy_staging_stage(self) -> bool:
        """Deploy to staging environment"""
        if not self.current_build.artifacts:
            return False
        artifact = self.current_build.artifacts[0]
        return self.deployment_manager.deploy_to_staging(artifact)

    def _integration_test_stage(self) -> bool:
        """Run integration tests in staging"""
        test_results = self.test_runner.run_integration_tests()
        if self.current_build.test_results is None:
            self.current_build.test_results = {}
        self.current_build.test_results["integration"] = test_results

        self.current_build.logs.extend([
            f"Integration tests: {test_results['passed']}/{test_results['total_tests']} passed",
            f"Success rate: {test_results['success_rate']:.1f}%",
        ])

        return test_results['success_rate'] >= self.required_success_rate

    def _deploy_production_stage(self) -> bool:
        """Deploy to production environment"""
        if not self.current_build.artifacts:
            return False
        artifact = self.current_build.artifacts[0]
        return self.deployment_manager.deploy_to_production(artifact)

    def get_pipeline_metrics(self) -> Dict[str, Any]:
        """Get pipeline performance metrics"""
        if not self.build_history:
            return {}

        successful_builds = [b for b in self.build_history if b.status == BuildStatus.SUCCESS]
        failed_builds = [b for b in self.build_history if b.status == BuildStatus.FAILED]

        total_duration = sum(b.duration for b in self.build_history)
        avg_duration = total_duration / len(self.build_history) if self.build_history else 0

        success_rate = (len(successful_builds) / len(self.build_history)) * 100

        # Stage success rates
        stage_success_rates: Dict[str, float] = {}
        for stage in PipelineStage:
            stage_builds = [b for b in self.build_history if stage in b.stages]
            if stage_builds:
                successful_stage = [b for b in stage_builds if b.stages[stage] == BuildStatus.SUCCESS]
                stage_success_rates[stage.value] = (len(successful_stage) / len(stage_builds)) * 100

        return {
            "total_builds": len(self.build_history),
            "successful_builds": len(successful_builds),
            "failed_builds": len(failed_builds),
            "success_rate": success_rate,
            "average_duration": avg_duration,
            "stage_success_rates": stage_success_rates,
        }

    def print_build_report(self, build: BuildResult):
        """Print detailed build report"""
        print(f"\n=== Build Report: {build.build_id} ===")
        print(f"Status: {build.status.value}")
        print(f"Duration: {build.duration:.2f} seconds")

        print("\nStages:")
        for stage, status in build.stages.items():
            print(f"  {stage.value}: {status.value}")

        if build.test_results:
            print("\nTest Results:")
            # If a single suite returned as dict with 'type' key
            if isinstance(build.test_results, dict) and "type" in build.test_results:
                tr = build.test_results
                print(f"  {tr['type']}: {tr['passed']}/{tr['total_tests']} ({tr['success_rate']:.1f}%)")
            else:
                for test_type, results in build.test_results.items():
                    if isinstance(results, dict):
                        print(f"  {test_type}: {results['passed']}/{results['total_tests']} ({results['success_rate']:.1f}%)")

        if build.security_issues:
            print(f"\nSecurity Issues: {len(build.security_issues)}")
            for issue in build.security_issues[:3]:
                print(f"  {issue.get('severity','').upper()}: {issue.get('type')} - {issue.get('description')}")

        print("\nLogs (last 5):")
        for log in build.logs[-5:]:
            print(f"  {log}")


class DevOpsMonitor:
    """Monitor for tracking DevOps metrics and trends"""

    def __init__(self, pipelines: List[CICDPipeline]):
        self.pipelines = pipelines
        self.metrics_history: List[Dict[str, Any]] = []

    def collect_metrics(self):
        """Collect metrics from all pipelines"""
        timestamp = datetime.now()
        metrics_snapshot: Dict[str, Any] = {
            "timestamp": timestamp,
            "pipelines": {},
        }

        for pipeline in self.pipelines:
            metrics = pipeline.get_pipeline_metrics()
            metrics_snapshot["pipelines"][pipeline.name] = metrics

        self.metrics_history.append(metrics_snapshot)

        # Keep only last 100 snapshots
        if len(self.metrics_history) > 100:
            self.metrics_history.pop(0)

    def generate_devops_report(self) -> Dict[str, Any]:
        """Generate comprehensive DevOps report"""
        if not self.metrics_history:
            return {}

        latest_snapshot = self.metrics_history[-1]

        report = {
            "report_time": datetime.now().isoformat(),
            "summary": {
                "total_pipelines": len(self.pipelines),
                "monitoring_period_hours": len(self.metrics_history),
            },
            "pipeline_performance": {},
            "trends": self._calculate_trends(),
        }

        for pipeline_name, metrics in latest_snapshot["pipelines"].items():
            report["pipeline_performance"][pipeline_name] = {
                "success_rate": metrics.get("success_rate", 0),
                "average_duration": metrics.get("average_duration", 0),
                "throughput": metrics.get("total_builds", 0) / max(len(self.metrics_history), 1),
                "reliability": self._calculate_reliability_score(pipeline_name),
            }

        return report

    def _calculate_trends(self) -> Dict[str, Any]:
        """Calculate performance trends over time"""
        if len(self.metrics_history) < 2:
            return {}

        first_snapshot = self.metrics_history[0]
        last_snapshot = self.metrics_history[-1]

        trends: Dict[str, Any] = {}
        for pipeline_name in last_snapshot["pipelines"]:
            if pipeline_name in first_snapshot["pipelines"]:
                first_metrics = first_snapshot["pipelines"][pipeline_name]
                last_metrics = last_snapshot["pipelines"][pipeline_name]

                success_rate_change = (last_metrics.get("success_rate", 0) - first_metrics.get("success_rate", 0))
                duration_change = (last_metrics.get("average_duration", 0) - first_metrics.get("average_duration", 0))

                trends[pipeline_name] = {
                    "success_rate_trend": "improving" if success_rate_change > 0 else "declining",
                    "success_rate_change": success_rate_change,
                    "duration_trend": "faster" if duration_change < 0 else "slower",
                    "duration_change": abs(duration_change),
                }
        return trends

    def _calculate_reliability_score(self, pipeline_name: str) -> float:
        """Calculate reliability score for a pipeline (0-100)"""
        relevant_snapshots = [
            snapshot for snapshot in self.metrics_history
            if pipeline_name in snapshot["pipelines"]
        ]

        if not relevant_snapshots:
            return 0.0

        success_rates = [
            snapshot["pipelines"][pipeline_name].get("success_rate", 0)
            for snapshot in relevant_snapshots
        ]

        avg_success_rate = sum(success_rates) / len(success_rates)

        # Consider stability (consistency of success rates)
        if len(success_rates) > 1:
            stability = 1 - (statistics.stdev(success_rates) / 100)
            reliability = avg_success_rate * stability
        else:
            reliability = avg_success_rate

        return min(100, max(0, reliability))


# Demonstration
def simulate_ci_cd_workflow():
    """Simulate a complete CI/CD workflow"""
    print("=== CI/CD Pipeline Simulation ===")

    # Create pipelines
    main_pipeline = CICDPipeline("main-pipeline")
    feature_pipeline = CICDPipeline("feature-pipeline")

    # Enable auto-deploy for main pipeline
    main_pipeline.auto_deploy_to_production = True

    # Create monitor
    monitor = DevOpsMonitor([main_pipeline, feature_pipeline])

    # Simulate multiple builds
    print("\n=== Running Builds ===")
    for i in range(5):
        print(f"\n--- Build Cycle {i + 1} ---")
        # Get code changes
        changes = main_pipeline.vcs.get_latest_changes()

        # Trigger builds
        main_build = main_pipeline.trigger_build(changes)
        feature_build = feature_pipeline.trigger_build(changes)

        # Print build reports
        main_pipeline.print_build_report(main_build)

        # Collect metrics
        monitor.collect_metrics()

        # Wait between builds
        time.sleep(0.2)

    # Generate DevOps report
    print("\n=== DevOps Metrics Report ===")
    devops_report = monitor.generate_devops_report()

    print(f"Monitoring Period: {devops_report['summary']['monitoring_period_hours']} snapshots")
    print(f"Total Pipelines: {devops_report['summary']['total_pipelines']}")

    print("\nPipeline Performance:")
    for pipeline_name, performance in devops_report["pipeline_performance"].items():
        print(f"\n{pipeline_name}:")
        print(f"  Success Rate: {performance['success_rate']:.1f}%")
        print(f"  Average Duration: {performance['average_duration']:.2f}s")
        print(f"  Reliability Score: {performance['reliability']:.1f}/100")

    print("\nTrends:")
    for pipeline_name, trend in devops_report["trends"].items():
        print(f"\n{pipeline_name}:")
        print(f"  Success Rate: {trend['success_rate_trend']} ({trend['success_rate_change']:+.1f}%)")
        print(f"  Duration: {trend['duration_trend']} ({trend['duration_change']:.2f}s change)")


if __name__ == "__main__":
    simulate_ci_cd_workflow()
