# Module 1 Lab: SQLite Fundamentals with Python

This comprehensive lab explores SQLite fundamentals including connection management, data types, transactions, and error handling.

## Learning Objectives
- Master connection management patterns
- Implement robust error handling
- Work with different data types and conversions
- Understand transaction management
- Apply performance optimizations

**Estimated Time**: 90-120 minutes

In [None]:
# Import required modules
import sqlite3
import json
import time
import threading
from datetime import datetime, date
from decimal import Decimal
from contextlib import contextmanager
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print(f"SQLite version: {sqlite3.sqlite_version}")
print(f"Python sqlite3 module version: {sqlite3.version}")

## Section 1: Connection Management Patterns

In [None]:
# Exercise 1.1: Basic Connection Patterns

class DatabaseManager:
    """Professional database management class"""
    
    def __init__(self, db_path):
        self.db_path = db_path
        self.connection = None
    
    def connect(self):
        """Establish optimized database connection"""
        try:
            self.connection = sqlite3.connect(
                self.db_path,
                timeout=30.0,
                detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
                check_same_thread=False
            )
            
            # Apply performance optimizations
            optimizations = [
                "PRAGMA journal_mode = WAL",
                "PRAGMA synchronous = NORMAL", 
                "PRAGMA cache_size = 10000",
                "PRAGMA temp_store = MEMORY",
                "PRAGMA foreign_keys = ON"
            ]
            
            for pragma in optimizations:
                self.connection.execute(pragma)
            
            # Set row factory for named access
            self.connection.row_factory = sqlite3.Row
            
            logger.info(f"Connected to database: {self.db_path}")
            return self.connection
            
        except sqlite3.Error as e:
            logger.error(f"Failed to connect to database: {e}")
            raise
    
    def close(self):
        """Close database connection"""
        if self.connection:
            self.connection.close()
            logger.info("Database connection closed")
    
    @contextmanager
    def get_cursor(self):
        """Context manager for cursor operations"""
        if not self.connection:
            self.connect()
        
        cursor = self.connection.cursor()
        try:
            yield cursor
        finally:
            cursor.close()

# Test the DatabaseManager
db_manager = DatabaseManager('lab1_fundamentals.db')
db_manager.connect()

print("✅ Database connection established with optimizations")

In [None]:
# Exercise 1.2: Create comprehensive schema

def create_schema(db_manager):
    """Create a comprehensive database schema"""
    
    with db_manager.get_cursor() as cursor:
        
        # Customers table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS customers (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL,
                phone TEXT,
                address JSON,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                is_active BOOLEAN DEFAULT 1,
                credit_limit DECIMAL(10,2) DEFAULT 1000.00
            )
        """)
        
        # Products table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                description TEXT,
                price DECIMAL(10,2) NOT NULL CHECK(price >= 0),
                category TEXT NOT NULL,
                stock_quantity INTEGER DEFAULT 0 CHECK(stock_quantity >= 0),
                metadata JSON,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        
        # Orders table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS orders (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                customer_id INTEGER NOT NULL,
                order_date DATETIME NOT NULL,
                total_amount DECIMAL(10,2) NOT NULL CHECK(total_amount >= 0),
                status TEXT DEFAULT 'pending' CHECK(status IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled')),
                shipping_address JSON,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (customer_id) REFERENCES customers(id)
            )
        """)
        
        # Order items table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS order_items (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                order_id INTEGER NOT NULL,
                product_id INTEGER NOT NULL,
                quantity INTEGER NOT NULL CHECK(quantity > 0),
                unit_price DECIMAL(10,2) NOT NULL CHECK(unit_price >= 0),
                total_price DECIMAL(10,2) GENERATED ALWAYS AS (quantity * unit_price) STORED,
                FOREIGN KEY (order_id) REFERENCES orders(id),
                FOREIGN KEY (product_id) REFERENCES products(id)
            )
        """)
        
        # Create indexes for performance
        indexes = [
            "CREATE INDEX IF NOT EXISTS idx_customers_email ON customers(email)",
            "CREATE INDEX IF NOT EXISTS idx_products_category ON products(category)",
            "CREATE INDEX IF NOT EXISTS idx_orders_customer ON orders(customer_id)",
            "CREATE INDEX IF NOT EXISTS idx_orders_date ON orders(order_date)",
            "CREATE INDEX IF NOT EXISTS idx_order_items_order ON order_items(order_id)",
            "CREATE INDEX IF NOT EXISTS idx_order_items_product ON order_items(product_id)"
        ]
        
        for index_sql in indexes:
            cursor.execute(index_sql)
        
        db_manager.connection.commit()
        logger.info("Schema created successfully with indexes")

create_schema(db_manager)
print("✅ Comprehensive schema created")

## Section 2: Data Type Handling and Conversions

In [None]:
# Exercise 2.1: Custom Type Adapters and Converters

# Register custom type adapters
def json_adapter(obj):
    """Convert Python object to JSON string"""
    return json.dumps(obj)

def json_converter(s):
    """Convert JSON string back to Python object"""
    return json.loads(s.decode())

def decimal_adapter(decimal_obj):
    """Convert Decimal to string for storage"""
    return str(decimal_obj)

def decimal_converter(s):
    """Convert string back to Decimal"""
    return Decimal(s.decode())

def bool_adapter(bool_obj):
    """Convert boolean to integer for storage"""
    return 1 if bool_obj else 0

def bool_converter(s):
    """Convert integer back to boolean"""
    return bool(int(s))

# Register all adapters and converters
sqlite3.register_adapter(dict, json_adapter)
sqlite3.register_adapter(list, json_adapter)
sqlite3.register_converter("JSON", json_converter)

sqlite3.register_adapter(Decimal, decimal_adapter)
sqlite3.register_converter("DECIMAL", decimal_converter)

sqlite3.register_adapter(bool, bool_adapter)
sqlite3.register_converter("BOOLEAN", bool_converter)

print("✅ Custom type adapters and converters registered")

In [None]:
# Exercise 2.2: Insert data with various types

def insert_sample_data(db_manager):
    """Insert sample data demonstrating various data types"""
    
    with db_manager.get_cursor() as cursor:
        
        # Insert customers with JSON addresses
        customers_data = [
            ('John Doe', 'john@example.com', '+1-555-0101', 
             {'street': '123 Main St', 'city': 'New York', 'zip': '10001'}, 
             True, Decimal('5000.00')),
            ('Jane Smith', 'jane@example.com', '+1-555-0102',
             {'street': '456 Oak Ave', 'city': 'Los Angeles', 'zip': '90210'},
             True, Decimal('3000.00')),
            ('Bob Johnson', 'bob@example.com', '+1-555-0103',
             {'street': '789 Pine St', 'city': 'Chicago', 'zip': '60601'},
             False, Decimal('1500.00'))
        ]
        
        cursor.executemany("""
            INSERT INTO customers (name, email, phone, address, is_active, credit_limit) 
            VALUES (?, ?, ?, ?, ?, ?)
        """, customers_data)
        
        # Insert products with JSON metadata
        products_data = [
            ('Gaming Laptop', 'High-performance gaming laptop', Decimal('1299.99'), 'Electronics',
             10, {'brand': 'TechCorp', 'specs': {'ram': '16GB', 'storage': '512GB SSD', 'gpu': 'RTX 3070'}}),
            ('Wireless Mouse', 'Ergonomic wireless mouse', Decimal('29.99'), 'Electronics',
             50, {'brand': 'MouseCorp', 'specs': {'dpi': 3200, 'battery_life': '2 years'}}),
            ('Programming Book', 'Learn Python programming', Decimal('39.99'), 'Books',
             25, {'author': 'Tech Author', 'pages': 500, 'level': 'intermediate'}),
            ('Coffee Mug', 'Programmer\'s coffee mug', Decimal('12.99'), 'Office',
             100, {'material': 'ceramic', 'capacity': '350ml', 'dishwasher_safe': True})
        ]
        
        cursor.executemany("""
            INSERT INTO products (name, description, price, category, stock_quantity, metadata)
            VALUES (?, ?, ?, ?, ?, ?)
        """, products_data)
        
        db_manager.connection.commit()
        logger.info(f"Inserted {len(customers_data)} customers and {len(products_data)} products")

insert_sample_data(db_manager)
print("✅ Sample data inserted with various data types")

In [None]:
# Exercise 2.3: Query and verify data type conversions

def verify_data_types(db_manager):
    """Verify that data types are correctly converted"""
    
    with db_manager.get_cursor() as cursor:
        
        # Query customers with JSON and other types
        cursor.execute("""
            SELECT name, email, address, is_active, credit_limit, created_at
            FROM customers 
            ORDER BY id
        """)
        
        print("Customer Data Type Verification:")
        print("-" * 60)
        
        for row in cursor.fetchall():
            print(f"Name: {row['name']} (type: {type(row['name'])})")
            print(f"Address: {row['address']} (type: {type(row['address'])})")
            print(f"Active: {row['is_active']} (type: {type(row['is_active'])})")
            print(f"Credit Limit: {row['credit_limit']} (type: {type(row['credit_limit'])})")
            print(f"Created: {row['created_at']} (type: {type(row['created_at'])})")
            print("-" * 30)
        
        # Query products with JSON metadata
        cursor.execute("""
            SELECT name, price, metadata
            FROM products 
            WHERE category = 'Electronics'
        """)
        
        print("\nProduct Metadata Examples:")
        print("-" * 40)
        
        for row in cursor.fetchall():
            print(f"Product: {row['name']}")
            print(f"Price: ${row['price']} (type: {type(row['price'])})")
            print(f"Metadata: {row['metadata']}")
            if isinstance(row['metadata'], dict) and 'specs' in row['metadata']:
                print(f"Specs: {row['metadata']['specs']}")
            print("-" * 20)

verify_data_types(db_manager)
print("\n✅ Data type conversions verified")

## Section 3: Transaction Management

In [None]:
# Exercise 3.1: Manual Transaction Management

def process_order_transaction(db_manager, customer_id, order_items):
    """Process an order using manual transaction management"""
    
    cursor = db_manager.connection.cursor()
    
    try:
        # Start transaction
        cursor.execute("BEGIN TRANSACTION")
        
        # Create order
        order_date = datetime.now()
        total_amount = Decimal('0.00')
        
        cursor.execute("""
            INSERT INTO orders (customer_id, order_date, total_amount, status)
            VALUES (?, ?, ?, 'pending')
        """, (customer_id, order_date, total_amount))
        
        order_id = cursor.lastrowid
        
        # Process each order item
        for product_id, quantity in order_items:
            # Check stock availability
            cursor.execute("""
                SELECT stock_quantity, price FROM products WHERE id = ?
            """, (product_id,))
            
            result = cursor.fetchone()
            if not result:
                raise ValueError(f"Product {product_id} not found")
            
            stock_quantity, unit_price = result
            
            if stock_quantity < quantity:
                raise ValueError(f"Insufficient stock for product {product_id}. Available: {stock_quantity}, Requested: {quantity}")
            
            # Update stock
            cursor.execute("""
                UPDATE products SET stock_quantity = stock_quantity - ? WHERE id = ?
            """, (quantity, product_id))
            
            # Add order item
            cursor.execute("""
                INSERT INTO order_items (order_id, product_id, quantity, unit_price)
                VALUES (?, ?, ?, ?)
            """, (order_id, product_id, quantity, unit_price))
            
            # Update total amount
            total_amount += unit_price * quantity
        
        # Update order total
        cursor.execute("""
            UPDATE orders SET total_amount = ? WHERE id = ?
        """, (total_amount, order_id))
        
        # Check customer credit limit
        cursor.execute("""
            SELECT credit_limit FROM customers WHERE id = ?
        """, (customer_id,))
        
        credit_limit = cursor.fetchone()[0]
        
        if total_amount > credit_limit:
            raise ValueError(f"Order total ${total_amount} exceeds credit limit ${credit_limit}")
        
        # Commit transaction
        cursor.execute("COMMIT")
        logger.info(f"Order {order_id} processed successfully. Total: ${total_amount}")
        
        return order_id, total_amount
        
    except Exception as e:
        # Rollback on any error
        cursor.execute("ROLLBACK")
        logger.error(f"Order processing failed: {e}")
        raise
    
    finally:
        cursor.close()

# Test successful order
try:
    order_items = [(1, 1), (2, 2)]  # 1 laptop, 2 mice
    order_id, total = process_order_transaction(db_manager, 1, order_items)
    print(f"✅ Order {order_id} processed successfully. Total: ${total}")
except Exception as e:
    print(f"❌ Order failed: {e}")

# Test failed order (insufficient stock)
try:
    order_items = [(1, 20)]  # 20 laptops (not enough stock)
    order_id, total = process_order_transaction(db_manager, 2, order_items)
    print(f"✅ Order {order_id} processed successfully. Total: ${total}")
except Exception as e:
    print(f"❌ Order failed as expected: {e}")

In [None]:
# Exercise 3.2: Context Manager Transaction

@contextmanager
def transaction_manager(db_manager):
    """Context manager for automatic transaction handling"""
    cursor = db_manager.connection.cursor()
    
    try:
        cursor.execute("BEGIN TRANSACTION")
        yield cursor
        cursor.execute("COMMIT")
        logger.info("Transaction committed successfully")
        
    except Exception as e:
        cursor.execute("ROLLBACK")
        logger.error(f"Transaction rolled back: {e}")
        raise
        
    finally:
        cursor.close()

def update_customer_info(db_manager, customer_id, new_credit_limit, new_address):
    """Update customer information using transaction context manager"""
    
    with transaction_manager(db_manager) as cursor:
        # Update credit limit
        cursor.execute("""
            UPDATE customers SET credit_limit = ? WHERE id = ?
        """, (new_credit_limit, customer_id))
        
        # Update address
        cursor.execute("""
            UPDATE customers SET address = ? WHERE id = ?
        """, (new_address, customer_id))
        
        # Verify updates
        cursor.execute("""
            SELECT name, credit_limit, address FROM customers WHERE id = ?
        """, (customer_id,))
        
        result = cursor.fetchone()
        if not result:
            raise ValueError(f"Customer {customer_id} not found")
        
        logger.info(f"Updated customer {result[0]}: Credit limit = ${result[1]}, Address = {result[2]}")

# Test context manager transaction
try:
    new_address = {'street': '999 Updated St', 'city': 'New City', 'zip': '12345'}
    update_customer_info(db_manager, 1, Decimal('7500.00'), new_address)
    print("✅ Customer updated successfully using context manager")
except Exception as e:
    print(f"❌ Customer update failed: {e}")

## Section 4: Error Handling and Recovery

In [None]:
# Exercise 4.1: Comprehensive Error Handling

def robust_database_operation(db_manager, operation_func, max_retries=3, *args, **kwargs):
    """Execute database operation with comprehensive error handling and retry logic"""
    
    for attempt in range(max_retries):
        try:
            return operation_func(db_manager, *args, **kwargs)
            
        except sqlite3.IntegrityError as e:
            logger.error(f"Integrity constraint violation: {e}")
            raise  # Don't retry integrity errors
            
        except sqlite3.OperationalError as e:
            if "database is locked" in str(e).lower() and attempt < max_retries - 1:
                wait_time = 0.1 * (2 ** attempt)  # Exponential backoff
                logger.warning(f"Database locked, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})")
                time.sleep(wait_time)
                continue
            else:
                logger.error(f"Operational error: {e}")
                raise
                
        except sqlite3.DataError as e:
            logger.error(f"Data error: {e}")
            raise  # Don't retry data errors
            
        except sqlite3.Error as e:
            logger.error(f"General SQLite error: {e}")
            if attempt < max_retries - 1:
                time.sleep(0.1)
                continue
            else:
                raise
                
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            raise
    
    raise RuntimeError(f"Operation failed after {max_retries} attempts")

def insert_duplicate_customer(db_manager):
    """Function that will cause integrity error (duplicate email)"""
    with db_manager.get_cursor() as cursor:
        cursor.execute("""
            INSERT INTO customers (name, email, phone, address, is_active, credit_limit)
            VALUES (?, ?, ?, ?, ?, ?)
        """, ('Duplicate User', 'john@example.com', '+1-555-9999', {}, True, Decimal('1000.00')))
        db_manager.connection.commit()

def insert_invalid_data(db_manager):
    """Function that will cause check constraint violation"""
    with db_manager.get_cursor() as cursor:
        cursor.execute("""
            INSERT INTO products (name, description, price, category, stock_quantity)
            VALUES (?, ?, ?, ?, ?)
        """, ('Invalid Product', 'Test product', Decimal('-10.00'), 'Test', -5))
        db_manager.connection.commit()

# Test error handling
print("Testing Error Handling:")
print("-" * 30)

# Test integrity error
try:
    robust_database_operation(db_manager, insert_duplicate_customer)
    print("❌ Should have failed with integrity error")
except sqlite3.IntegrityError:
    print("✅ Correctly handled integrity error (duplicate email)")

# Test check constraint error
try:
    robust_database_operation(db_manager, insert_invalid_data)
    print("❌ Should have failed with check constraint error")
except sqlite3.IntegrityError:
    print("✅ Correctly handled check constraint error (negative price/stock)")

## Section 5: Performance Analysis and Optimization

In [None]:
# Exercise 5.1: Query Performance Analysis

def analyze_query_performance(db_manager):
    """Analyze query performance with EXPLAIN QUERY PLAN"""
    
    queries = [
        ("Simple SELECT", "SELECT * FROM customers WHERE email = 'john@example.com'"),
        ("JOIN query", """
            SELECT c.name, o.order_date, o.total_amount 
            FROM customers c 
            JOIN orders o ON c.id = o.customer_id
        """),
        ("Aggregate query", """
            SELECT category, COUNT(*) as product_count, AVG(price) as avg_price
            FROM products 
            GROUP BY category
        """),
        ("Complex JOIN with aggregation", """
            SELECT c.name, COUNT(o.id) as order_count, SUM(o.total_amount) as total_spent
            FROM customers c
            LEFT JOIN orders o ON c.id = o.customer_id
            GROUP BY c.id, c.name
            ORDER BY total_spent DESC
        """)
    ]
    
    with db_manager.get_cursor() as cursor:
        print("Query Performance Analysis:")
        print("=" * 60)
        
        for query_name, query in queries:
            print(f"\n{query_name}:")
            print("-" * 30)
            
            # Analyze query plan
            cursor.execute(f"EXPLAIN QUERY PLAN {query}")
            plan = cursor.fetchall()
            
            for row in plan:
                print(f"  {row[0]}|{row[1]}|{row[2]}|{row[3]}")
            
            # Time the actual query
            start_time = time.time()
            cursor.execute(query)
            results = cursor.fetchall()
            execution_time = time.time() - start_time
            
            print(f"  Execution time: {execution_time:.4f} seconds")
            print(f"  Rows returned: {len(results)}")

analyze_query_performance(db_manager)
print("\n✅ Query performance analysis completed")

In [None]:
# Exercise 5.2: Database Statistics and Monitoring

def get_database_statistics(db_manager):
    """Get comprehensive database statistics"""
    
    with db_manager.get_cursor() as cursor:
        stats = {}
        
        # Database file information
        cursor.execute("PRAGMA page_count")
        page_count = cursor.fetchone()[0]
        
        cursor.execute("PRAGMA page_size")
        page_size = cursor.fetchone()[0]
        
        stats['file_size_bytes'] = page_count * page_size
        stats['file_size_mb'] = stats['file_size_bytes'] / (1024 * 1024)
        stats['page_count'] = page_count
        stats['page_size'] = page_size
        
        # Cache statistics
        cursor.execute("PRAGMA cache_size")
        stats['cache_size'] = cursor.fetchone()[0]
        
        # Journal mode
        cursor.execute("PRAGMA journal_mode")
        stats['journal_mode'] = cursor.fetchone()[0]
        
        # Table row counts
        tables = ['customers', 'products', 'orders', 'order_items']
        table_stats = {}
        
        for table in tables:
            cursor.execute(f"SELECT COUNT(*) FROM {table}")
            table_stats[table] = cursor.fetchone()[0]
        
        stats['table_row_counts'] = table_stats
        
        # Index information
        cursor.execute("""
            SELECT name, tbl_name FROM sqlite_master 
            WHERE type = 'index' AND name NOT LIKE 'sqlite_%'
        """)
        indexes = cursor.fetchall()
        stats['custom_indexes'] = len(indexes)
        
        return stats

def display_database_statistics(stats):
    """Display database statistics in a formatted way"""
    
    print("Database Statistics:")
    print("=" * 40)
    
    print(f"File Size: {stats['file_size_mb']:.2f} MB ({stats['file_size_bytes']:,} bytes)")
    print(f"Page Count: {stats['page_count']:,}")
    print(f"Page Size: {stats['page_size']:,} bytes")
    print(f"Cache Size: {stats['cache_size']:,} pages")
    print(f"Journal Mode: {stats['journal_mode']}")
    print(f"Custom Indexes: {stats['custom_indexes']}")
    
    print("\nTable Row Counts:")
    print("-" * 20)
    for table, count in stats['table_row_counts'].items():
        print(f"  {table}: {count:,}")

# Get and display statistics
stats = get_database_statistics(db_manager)
display_database_statistics(stats)
print("\n✅ Database statistics generated")

## Section 6: Advanced Features

In [None]:
# Exercise 6.1: Custom SQL Functions

def create_custom_functions(connection):
    """Create custom SQL functions for business logic"""
    
    def calculate_discount(price, discount_percent):
        """Calculate discounted price"""
        if price is None or discount_percent is None:
            return None
        return float(price) * (1 - float(discount_percent) / 100)
    
    def format_phone(phone):
        """Format phone number consistently"""
        if not phone:
            return None
        # Remove all non-digit characters
        digits = ''.join(filter(str.isdigit, phone))
        if len(digits) == 10:
            return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
        elif len(digits) == 11 and digits[0] == '1':
            return f"+1 ({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
        return phone  # Return original if format not recognized
    
    def json_get(json_text, key):
        """Extract value from JSON by key"""
        try:
            data = json.loads(json_text)
            return data.get(key)
        except:
            return None
    
    # Register functions
    connection.create_function("DISCOUNT", 2, calculate_discount)
    connection.create_function("FORMAT_PHONE", 1, format_phone)
    connection.create_function("JSON_GET", 2, json_get)
    
    print("✅ Custom SQL functions registered")

# Register custom functions
create_custom_functions(db_manager.connection)

# Test custom functions
with db_manager.get_cursor() as cursor:
    print("\nTesting Custom SQL Functions:")
    print("-" * 35)
    
    # Test discount function
    cursor.execute("""
        SELECT name, price, DISCOUNT(price, 15) as discounted_price
        FROM products 
        WHERE category = 'Electronics'
    """)
    
    print("Products with 15% discount:")
    for row in cursor.fetchall():
        print(f"  {row[0]}: ${row[1]} → ${row[2]:.2f}")
    
    # Test phone formatting
    cursor.execute("""
        SELECT name, phone, FORMAT_PHONE(phone) as formatted_phone
        FROM customers
    """)
    
    print("\nFormatted phone numbers:")
    for row in cursor.fetchall():
        print(f"  {row[0]}: {row[1]} → {row[2]}")
    
    # Test JSON extraction
    cursor.execute("""
        SELECT name, JSON_GET(address, 'city') as city
        FROM customers
    """)
    
    print("\nExtracted cities from JSON:")
    for row in cursor.fetchall():
        print(f"  {row[0]}: {row[1]}")

print("\n✅ Custom functions tested successfully")

## Section 7: Lab Summary and Cleanup

In [None]:
# Final verification and cleanup

def final_database_summary(db_manager):
    """Generate final summary of what was accomplished"""
    
    with db_manager.get_cursor() as cursor:
        print("=" * 60)
        print("LAB 1 FINAL SUMMARY")
        print("=" * 60)
        
        # Schema summary
        cursor.execute("""
            SELECT type, COUNT(*) as count
            FROM sqlite_master 
            WHERE name NOT LIKE 'sqlite_%'
            GROUP BY type
        """)
        
        print("\n📊 Database Schema:")
        for row in cursor.fetchall():
            print(f"  {row[0].title()}s: {row[1]}")
        
        # Data summary
        tables = ['customers', 'products', 'orders', 'order_items']
        print("\n📈 Data Summary:")
        total_records = 0
        for table in tables:
            cursor.execute(f"SELECT COUNT(*) FROM {table}")
            count = cursor.fetchone()[0]
            print(f"  {table}: {count} records")
            total_records += count
        
        print(f"  Total records: {total_records}")
        
        # Business metrics
        cursor.execute("SELECT COUNT(*) FROM orders WHERE status != 'cancelled'")
        successful_orders = cursor.fetchone()[0]
        
        cursor.execute("SELECT SUM(total_amount) FROM orders WHERE status != 'cancelled'")
        total_revenue = cursor.fetchone()[0] or 0
        
        print("\n💰 Business Metrics:")
        print(f"  Successful orders: {successful_orders}")
        print(f"  Total revenue: ${total_revenue}")
        
        # Technical achievements
        print("\n✅ Technical Achievements:")
        achievements = [
            "Connection management with optimization",
            "Custom type adapters and converters",
            "JSON data storage and querying",
            "Manual and automatic transaction management",
            "Comprehensive error handling",
            "Query performance analysis",
            "Custom SQL functions",
            "Database statistics monitoring"
        ]
        
        for achievement in achievements:
            print(f"  ✓ {achievement}")

# Generate final summary
final_database_summary(db_manager)

# Close database connection
db_manager.close()

print("\n" + "=" * 60)
print("🎉 LAB 1 COMPLETED SUCCESSFULLY!")
print("=" * 60)
print("\nYou have successfully mastered:")
print("• SQLite database architecture and concepts")
print("• Python sqlite3 module advanced usage")
print("• Professional connection management patterns")
print("• Data type handling and custom conversions")
print("• Transaction management and error recovery")
print("• Performance optimization techniques")
print("• Custom SQL functions and business logic")
print("\nReady for Module 2: Big-6 SQL Statements! 🚀")