### ** ----- Utility Module ------ **
Helps to create, Manage Database

#### Environment Setup

In [None]:
import subprocess
class EnvironmentSetup:
    """Class to handle environment setup and configuration."""
    def __init__(self) -> None:
        self.__install_mysql()

    def __install_mysql(self) -> None:
        """Installs MySQL server and connector if not already installed."""
        subprocess.run(['pip', 'install', 'mysql-connector-python', '--quiet'])
        subprocess.run(['apt-get', '-y', 'install', 'mysql-server', '--quiet'])
        subprocess.run(['service', 'mysql', 'start'])
        print("MySQL installed and started.")

env = EnvironmentSetup()

import mysql.connector
from mysql.connector import errorcode
from mysql.connector.connection import MySQLConnection
from typing import Optional

#### MySQL Command Executor Class

In [None]:
class MySQLCommandExecutor:
    """Class to handle MySQL command execution via subprocess."""

    def __init__(self, user: str = 'root', password: str ='root') -> None:
        self.user = user
        self.password = password

    def execute_command(self, command: str) -> str | None:
        """Executes a MySQL shell command using subprocess."""
        try:
            full_command = f"mysql -u{self.user} -p{self.password} -e \"{command}\""
            result = subprocess.run(full_command, shell=True, check=True, capture_output=True, text=True)
            print("Command executed successfully.")
            return result.stdout
        except subprocess.CalledProcessError as e:
            print("Error executing command:", e.stderr)
            return None

    def set_root_password(self, new_password: str = 'root') -> str | None:
        """Sets the password for the root MySQL user."""
        alter_command = (
            f"ALTER USER 'root'@'localhost' IDENTIFIED WITH 'mysql_native_password' BY '{new_password}';"
            "FLUSH PRIVILEGES;"
        )
        return self.execute_command(alter_command)

#### Database Manager

In [None]:
class DatabaseManager:
    """Handles database operations like creating tables and executing queries."""

    def __init__(self, connection: MySQLConnection) -> None:
        self.connection = connection
        self.cursor = connection.cursor()

    def execute_script(self, script: str, verbose: bool = False) -> None:
        """Executes a multi-statement SQL script."""
        try:
            for result in self.cursor.execute(script, multi=True):
                if verbose:
                    print(f'Executed query: {script}')
            self.connection.commit()
            print("Script executed successfully.")
        except mysql.connector.Error as err:
            print(f"Error executing script: {err}")

    def close(self) -> None:
        """Closes the database connection."""
        self.cursor.close()
        self.connection.close()

#### Database Connector Factory

In [None]:
class DatabaseConnectionFactory:
    """Factory class to create and manage database connections."""

    @staticmethod
    def get_connection(user='root', password='root', host='localhost') -> Optional[MySQLConnection]:
        """Establishes a MySQL connection."""
        try:
            connection = mysql.connector.connect(
                host=host,
                user=user,
                password=password
            )
            return connection
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
                print("Error: Invalid username or password")
            elif err.errno == errorcode.ER_BAD_DB_ERROR:
                print("Error: Database does not exist")
            else:
                print(err)
            return None

#### Inventory Database Initializer

In [None]:
class InventoryDBInitializer:
    """Initializes the store inventory database with tables and sample data."""

    CREATE_DB_SCRIPT = '''
    DROP DATABASE IF EXISTS store_inventory;
    CREATE DATABASE IF NOT EXISTS store_inventory;
    USE store_inventory;
    '''

    CREATE_TABLES_SCRIPT = '''
    DROP TABLE IF EXISTS t_shirts;
    CREATE TABLE IF NOT EXISTS t_shirts (
        t_shirt_id INT AUTO_INCREMENT PRIMARY KEY,
        brand ENUM('Peter England', 'Van Huesen', 'Levis', 'Nike', 'Adidas', 'Puma') NOT NULL,
        color ENUM('Red', 'Blue', 'Pink', 'Black', 'White', 'Navy') NOT NULL,
        size ENUM('XS', 'S', 'M', 'L', 'XL') NOT NULL,
        price INT CHECK (price BETWEEN 10 AND 50),
        stock_quantity INT NOT NULL,
        UNIQUE KEY brand_color_size (brand, color, size)
    );

    DROP TABLE IF EXISTS discounts;
    CREATE TABLE IF NOT EXISTS discounts (
        discount_id INT AUTO_INCREMENT PRIMARY KEY,
        t_shirt_id INT NOT NULL,
        pct_discount DECIMAL(5,2) CHECK (pct_discount BETWEEN 0 AND 100),
        FOREIGN KEY (t_shirt_id) REFERENCES t_shirts(t_shirt_id)
    );
    '''

    CREATE_PROCEDURE_SCRIPT = '''
    CREATE PROCEDURE PopulateTShirts()
    BEGIN
        DECLARE counter INT DEFAULT 0;
        DECLARE max_records INT DEFAULT 10000;
        DECLARE brand VARCHAR(50);
        DECLARE color VARCHAR(50);
        DECLARE size VARCHAR(10);
        DECLARE price INT;
        DECLARE stock INT;

        -- Seed the random number generator
        SET SESSION rand_seed1 = UNIX_TIMESTAMP();

        WHILE counter < max_records DO
            -- Generate random values
            SET brand = ELT(FLOOR(1 + RAND() * 6), 'Peter England', 'Van Huesen', 'Levis', 'Nike', 'Adidas', 'Puma');
            SET color = ELT(FLOOR(1 + RAND() * 6), 'Red', 'Blue', 'Pink', 'Black', 'White', 'Navy');
            SET size = ELT(FLOOR(1 + RAND() * 5), 'XS', 'S', 'M', 'L', 'XL');
            SET price = FLOOR(10 + RAND() * 41);
            SET stock = FLOOR(10 + RAND() * 91);

            BEGIN
                DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END;  -- Handle any SQL exceptions (such as duplicate key)
                INSERT INTO t_shirts (brand, color, size, price, stock_quantity)
                VALUES (brand, color, size, price, stock);
                SET counter = counter + 1;
            END;
        END WHILE;
    END;
    '''

    POPULATE_DISCOUNTS_SCRIPT = '''
    INSERT INTO discounts (t_shirt_id, pct_discount)
    VALUES
    (1, 10.00), (2, 15.00), (3, 20.00), (4, 5.00),
    (5, 25.00), (6, 10.00), (7, 30.00), (8, 35.00),
    (9, 40.00), (10, 45.00), (11, 55.00);
    '''

    def __init__(self) -> None:
        self.manager = None

    def setup_database(self) -> None:
        """Sets up the inventory database and tables."""
        connection = DatabaseConnectionFactory.get_connection()
        if connection:
            self.manager = DatabaseManager(connection)
            try:
                # Create database and tables
                self.manager.execute_script(self.CREATE_DB_SCRIPT)
                self.manager.execute_script(self.CREATE_TABLES_SCRIPT)

                # Create the procedure
                self.manager.execute_script(self.CREATE_PROCEDURE_SCRIPT)

                # Populate the t_shirts table using the stored procedure
                self.manager.execute_script("CALL PopulateTShirts();")

                # Drop the procedure after use
                self.manager.execute_script("DROP PROCEDURE IF EXISTS PopulateTShirts;")

                # Populate the discounts table with a delay to ensure t_shirts are populated first
                self.manager.execute_script(self.POPULATE_DISCOUNTS_SCRIPT)

                print("Database and tables set up successfully.")

            except Exception as e:
                print(f"An error occurred while setting up the database: {e}")
            finally:
                self.manager.close()
        else:
            print("Failed to establish a database connection.")

#### Test Class

In [None]:
class DatabaseTest:
    """Class to test the database setup and generate a report on its state."""

    def __init__(self, connection: MySQLConnection):
        """Initialize with a database connection."""
        self.connection = connection
        self.cursor = self.connection.cursor(dictionary=True)

    def check_database(self):
        """Verify connection to the database."""
        try:
            self.cursor.execute("USE store_inventory;")
            print("Connected to store_inventory database.")
        except mysql.connector.Error as err:
            print("Error: Unable to connect to store_inventory database:", err)
            return False
        return True

    def check_tables_exist(self):
        """Check if the required tables exist in the database."""
        tables = ["t_shirts", "discounts"]
        missing_tables = []

        for table in tables:
            try:
                self.cursor.execute(f"SHOW TABLES LIKE '{table}';")
                result = self.cursor.fetchone()
                if not result:
                    missing_tables.append(table)
            except mysql.connector.Error as err:
                print(f"Error checking table {table}:", err)
                missing_tables.append(table)

        if missing_tables:
            print("Missing tables:", ", ".join(missing_tables))
            return False
        print("All required tables exist.")
        return True

    def check_record_counts(self):
        """Check the number of records in the tables."""
        record_counts = {}
        tables = ["t_shirts", "discounts"]

        for table in tables:
            try:
                self.cursor.execute(f"SELECT COUNT(*) AS count FROM {table};")
                result = self.cursor.fetchone()
                record_counts[table] = result['count']
            except mysql.connector.Error as err:
                print(f"Error querying table {table}:", err)
                record_counts[table] = "Error"

        print("Record Counts:")
        for table, count in record_counts.items():
            print(f"  {table}: {count}")
        return record_counts

    def query_sample_data(self):
        """Query and print a sample of records from each table."""
        samples = {}
        tables = ["t_shirts", "discounts"]

        for table in tables:
            try:
                self.cursor.execute(f"SELECT * FROM {table} LIMIT 5;")
                samples[table] = self.cursor.fetchall()
            except mysql.connector.Error as err:
                print(f"Error querying table {table}:", err)
                samples[table] = []

        print("Sample Data:")
        for table, rows in samples.items():
            print(f"\nTable: {table}")
            for row in rows:
                print(row)
        return samples

    def generate_report(self):
        """Generate and print a report on database structure and content."""
        print("\n--- Database Setup Report ---")
        if not self.check_database():
            print("Database check failed. Report incomplete.")
            return

        tables_exist = self.check_tables_exist()
        record_counts = self.check_record_counts()
        sample_data = self.query_sample_data()

        print("\n--- Report Summary ---")
        print("Tables Check:", "Pass" if tables_exist else "Fail")
        print("Record Counts:", record_counts)
        print("Sample Data Queried. See details above.")
        print("\n--- End of Report ---")

    def close(self):
        """Close the database connection."""
        self.cursor.close()
        self.connection.close()

#### Driver Code

In [None]:
if __name__ == "__main__":
    executor = MySQLCommandExecutor(user='root', password='root')
    executor.set_root_password('root')  # Change the root password if needed

    initializer = InventoryDBInitializer()
    initializer.setup_database()

    # Test
    connection = DatabaseConnectionFactory.get_connection(user='root', password='root', host='localhost')
    if connection:
        db_test = DatabaseTest(connection)
        db_test.generate_report()
        db_test.close()
    else:
        print("Could not establish connection for testing.")