In [None]:
import mysql.connector
from mysql.connector import Error
from datetime import datetime
from typing import List, Dict, Optional, Union

class OrderManager:
    def __init__(self, host: str, database: str, user: str, password: str, port: int = 3307):
        """Initialize the OrderManager with database connection parameters."""
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.port = port
        self.connection = None

    def __enter__(self):
        """Connect to the database when entering the context."""
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Close the connection when exiting the context."""
        self.close()

    def connect(self):
        """Establish a connection to the MySQL database."""
        try:
            self.connection = mysql.connector.connect(
                host=self.host,
                database=self.database,
                user=self.user,
                password=self.password,
                port=self.port
            )
            if self.connection.is_connected():
                print(f"Connected to MySQL database at {self.host}:{self.port}")
        except Error as e:
            print(f"Error connecting to MySQL at {self.host}:{self.port}: {e}")
            raise

    def close(self):
        """Close the database connection."""
        if self.connection and self.connection.is_connected():
            self.connection.close()
            print("MySQL connection closed")

    def create_order(self, order_data: Dict, items: List[Dict]) -> Optional[int]:
        """
        Create a new order and its items in the database.
        
        Args:
            order_data: Dictionary containing order details
            items: List of dictionaries containing order items
            
        Returns:
            The order_id of the created order or None if failed
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor()
        order_id = None

        try:
            # Insert order
            order_query = """
            INSERT INTO orders (
                customer_id, order_key, order_number, status, currency, 
                total_amount, discount_amount, tax_amount, shipping_amount,
                payment_method, payment_method_title, transaction_id, customer_note,
                date_paid, date_completed
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """
            
            order_values = (
                order_data.get('customer_id'),
                order_data.get('order_key'),
                order_data.get('order_number'),
                order_data.get('status', 'pending'),
                order_data.get('currency', 'USD'),
                order_data.get('total_amount'),
                order_data.get('discount_amount', 0.00),
                order_data.get('tax_amount', 0.00),
                order_data.get('shipping_amount', 0.00),
                order_data.get('payment_method'),
                order_data.get('payment_method_title'),
                order_data.get('transaction_id'),
                order_data.get('customer_note'),
                order_data.get('date_paid'),
                order_data.get('date_completed')
            )
            
            cursor.execute(order_query, order_values)
            order_id = cursor.lastrowid
            
            # Insert order items
            if items and order_id:
                item_query = """
                INSERT INTO order_items (
                    order_id, product_id, name, quantity, price, 
                    subtotal, total, tax_amount, tax_class, sku, meta_data
                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """
                
                item_values = []
                for item in items:
                    item_values.append((
                        order_id,
                        item.get('product_id'),
                        item.get('name'),
                        item.get('quantity', 1),
                        item.get('price'),
                        item.get('subtotal'),
                        item.get('total'),
                        item.get('tax_amount', 0.00),
                        item.get('tax_class'),
                        item.get('sku'),
                        item.get('meta_data')
                    ))
                
                cursor.executemany(item_query, item_values)
            
            self.connection.commit()
            return order_id
            
        except Error as e:
            print(f"Error creating order: {e}")
            self.connection.rollback()
            return None
        finally:
            cursor.close()

    def retrieve_order(self, order_id: int, include_items: bool = True) -> Optional[Dict]:
        """
        Retrieve an order by its ID.
        
        Args:
            order_id: The ID of the order to retrieve
            include_items: Whether to include order items in the result
            
        Returns:
            A dictionary containing order details (and items if requested) or None if not found
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor(dictionary=True)
        
        try:
            # Get order details
            order_query = "SELECT * FROM orders WHERE order_id = %s"
            cursor.execute(order_query, (order_id,))
            order = cursor.fetchone()
            
            if not order:
                return None
                
            if include_items:
                # Get order items
                items_query = "SELECT * FROM order_items WHERE order_id = %s"
                cursor.execute(items_query, (order_id,))
                items = cursor.fetchall()
                order['items'] = items
            
            return order
            
        except Error as e:
            print(f"Error retrieving order: {e}")
            return None
        finally:
            cursor.close()

    def list_orders(self, 
                   page: int = 1, 
                   per_page: int = 10,
                   customer_id: Optional[int] = None,
                   status: Optional[str] = None,
                   start_date: Optional[str] = None,
                   end_date: Optional[str] = None) -> Dict:
        """
        List orders with pagination and filtering options.
        
        Args:
            page: Page number (1-based)
            per_page: Number of items per page
            customer_id: Filter by customer ID
            status: Filter by order status
            start_date: Filter orders created after this date (YYYY-MM-DD)
            end_date: Filter orders created before this date (YYYY-MM-DD)
            
        Returns:
            A dictionary containing the list of orders and pagination info
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor(dictionary=True)
        
        try:
            # Build base query
            query = "SELECT * FROM orders WHERE 1=1"
            params = []
            
            # Add filters
            if customer_id:
                query += " AND customer_id = %s"
                params.append(customer_id)
                
            if status:
                query += " AND status = %s"
                params.append(status)
                
            if start_date:
                query += " AND date_created >= %s"
                params.append(start_date)
                
            if end_date:
                query += " AND date_created <= %s"
                params.append(end_date)
            
            # Add pagination
            offset = (page - 1) * per_page
            query += " ORDER BY date_created DESC LIMIT %s OFFSET %s"
            params.extend([per_page, offset])
            
            # Execute query
            cursor.execute(query, params)
            orders = cursor.fetchall()
            
            # Get total count for pagination
            count_query = "SELECT COUNT(*) as total FROM orders WHERE 1=1"
            count_params = params[:-2]  # Remove LIMIT and OFFSET params
            
            if count_params:
                cursor.execute(count_query, count_params)
            else:
                cursor.execute(count_query)
                
            total = cursor.fetchone()['total']
            
            return {
                'orders': orders,
                'pagination': {
                    'total': total,
                    'page': page,
                    'per_page': per_page,
                    'total_pages': (total + per_page - 1) // per_page
                }
            }
            
        except Error as e:
            print(f"Error listing orders: {e}")
            return {'orders': [], 'pagination': {}}
        finally:
            cursor.close()

    def update_order(self, order_id: int, update_data: Dict) -> bool:
        """
        Update an existing order.
        
        Args:
            order_id: The ID of the order to update
            update_data: Dictionary containing fields to update
            
        Returns:
            True if update was successful, False otherwise
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor()
        
        try:
            # Build update query dynamically based on provided fields
            set_clauses = []
            params = []
            
            for field, value in update_data.items():
                # Skip None values to avoid overwriting with NULL
                if value is not None:
                    set_clauses.append(f"{field} = %s")
                    params.append(value)
            
            if not set_clauses:
                return False  # Nothing to update
                
            params.append(order_id)
            query = f"UPDATE orders SET {', '.join(set_clauses)} WHERE order_id = %s"
            
            cursor.execute(query, params)
            self.connection.commit()
            
            return cursor.rowcount > 0
            
        except Error as e:
            print(f"Error updating order: {e}")
            self.connection.rollback()
            return False
        finally:
            cursor.close()

    def delete_order(self, order_id: int) -> bool:
        """
        Delete an order (this will cascade to order_items due to the foreign key constraint).
        
        Args:
            order_id: The ID of the order to delete
            
        Returns:
            True if deletion was successful, False otherwise
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor()
        
        try:
            query = "DELETE FROM orders WHERE order_id = %s"
            cursor.execute(query, (order_id,))
            self.connection.commit()
            
            return cursor.rowcount > 0
            
        except Error as e:
            print(f"Error deleting order: {e}")
            self.connection.rollback()
            return False
        finally:
            cursor.close()

    def batch_update_orders(self, order_ids: List[int], update_data: Dict) -> int:
        """
        Update multiple orders at once with the same changes.
        
        Args:
            order_ids: List of order IDs to update
            update_data: Dictionary containing fields to update
            
        Returns:
            Number of orders successfully updated
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor()
        
        try:
            # Build update query dynamically based on provided fields
            set_clauses = []
            params = []
            
            for field, value in update_data.items():
                # Skip None values to avoid overwriting with NULL
                if value is not None:
                    set_clauses.append(f"{field} = %s")
                    params.append(value)
            
            if not set_clauses:
                return 0  # Nothing to update
                
            # Add order_ids to params
            params.extend(order_ids)
            
            # Create placeholders for the IN clause
            placeholders = ', '.join(['%s'] * len(order_ids))
            query = f"UPDATE orders SET {', '.join(set_clauses)} WHERE order_id IN ({placeholders})"
            
            cursor.execute(query, params)
            self.connection.commit()
            
            return cursor.rowcount
            
        except Error as e:
            print(f"Error in batch updating orders: {e}")
            self.connection.rollback()
            return 0
        finally:
            cursor.close()

    def update_order_status(self, order_id: int, new_status: str) -> bool:
        """
        Helper method to specifically update an order's status.
        
        Args:
            order_id: The ID of the order to update
            new_status: The new status to set
            
        Returns:
            True if update was successful, False otherwise
        """
        return self.update_order(order_id, {'status': new_status})

    def get_order_by_number(self, order_number: str, include_items: bool = True) -> Optional[Dict]:
        """
        Retrieve an order by its order number.
        
        Args:
            order_number: The order number to search for
            include_items: Whether to include order items in the result
            
        Returns:
            A dictionary containing order details (and items if requested) or None if not found
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor(dictionary=True)
        
        try:
            # Get order details
            order_query = "SELECT * FROM orders WHERE order_number = %s"
            cursor.execute(order_query, (order_number,))
            order = cursor.fetchone()
            
            if not order:
                return None
                
            if include_items:
                # Get order items
                items_query = "SELECT * FROM order_items WHERE order_id = %s"
                cursor.execute(items_query, (order['order_id'],))
                items = cursor.fetchall()
                order['items'] = items
            
            return order
            
        except Error as e:
            print(f"Error retrieving order by number: {e}")
            return None
        finally:
            cursor.close()

In [None]:
# Example usage
with OrderManager(
    host="localhost",
    database="ecommerce_orders",
    user="root",
    password="root_db_hello111",
    port=3307  # explicitly specifying port
) as order_manager:
    # Create an order
    new_order = {
        "customer_id": 1,
        "order_key": "wc_order_abc1238",
        "order_number": "1008",
        "status": "pending",
        "total_amount": 99.99,
        "payment_method": "credit_card",
        "payment_method_title": "Credit Card"
    }
    
    order_items = [
        {
            "product_id": 1,
            "name": "Premium Widget",
            "quantity": 2,
            "price": 45.00,
            "subtotal": 90.00,
            "total": 99.00,  # includes tax
            "tax_amount": 9.00
        }
    ]
    
    # order_id = order_manager.create_order(new_order, order_items)
    # print(f"Created order with ID: {order_id}")
    order_id = 14
    # Retrieve an order
    # order = order_manager.retrieve_order(order_id)
    # print(f"Retrieved order: {order}")
    
    # # List orders
    # orders = order_manager.list_orders(page=1, per_page=10)
    # print(f"First page of orders: {orders}")
    
    # Update an order
    # update_success = order_manager.update_order(order_id, {"status": "processing"})
    # print(f"Order update successful: {update_success}")
    
    # Batch update orders
    # updated_count = order_manager.batch_update_orders(
    #     [order_id, 2, 3], 
    #     {"status": "completed"}
    # )
    # print(f"Batch updated {updated_count} orders")
    
    # Delete an order
    # delete_success = order_manager.delete_order(order_id)
    # print(f"Order deletion successful: {delete_success}")

In [None]:
import mysql.connector
from mysql.connector import Error
from datetime import datetime
from typing import List, Dict, Optional, Union

class ProductManager:
    def __init__(self, host: str, database: str, user: str, password: str, port: int = 3306):
        """Initialize the ProductManager with database connection parameters."""
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.port = port
        self.connection = None

    def __enter__(self):
        """Connect to the database when entering the context."""
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Close the connection when exiting the context."""
        self.close()

    def connect(self):
        """Establish a connection to the MySQL database."""
        try:
            self.connection = mysql.connector.connect(
                host=self.host,
                database=self.database,
                user=self.user,
                password=self.password,
                port=self.port
            )
            if self.connection.is_connected():
                print(f"Connected to MySQL database at {self.host}:{self.port}")
        except Error as e:
            print(f"Error connecting to MySQL at {self.host}:{self.port}: {e}")
            raise

    def close(self):
        """Close the database connection."""
        if self.connection and self.connection.is_connected():
            self.connection.close()
            print("MySQL connection closed")

    def create_product(self, product_data: Dict) -> Optional[int]:
        """
        Create a new product in the database.
        
        Args:
            product_data: Dictionary containing product details
            
        Returns:
            The product_id of the created product or None if failed
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor()
        product_id = None

        try:
            query = """
            INSERT INTO products (
                category, description, short_description, sku, price, regular_price, sale_price,
                stock_quantity, stock_status, weight, length, width, height, status
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """
            
            values = (
                product_data.get('category'),
                product_data.get('description'),
                product_data.get('short_description'),
                product_data.get('sku'),
                product_data.get('price'),
                product_data.get('regular_price'),
                product_data.get('sale_price'),
                product_data.get('stock_quantity', 0),
                product_data.get('stock_status', 'instock'),
                product_data.get('weight'),
                product_data.get('length'),
                product_data.get('width'),
                product_data.get('height'),
                product_data.get('status', 'published')
            )
            
            cursor.execute(query, values)
            product_id = cursor.lastrowid
            self.connection.commit()
            return product_id
            
        except Error as e:
            print(f"Error creating product: {e}")
            self.connection.rollback()
            return None
        finally:
            cursor.close()

    def retrieve_product(self, product_id: int) -> Optional[Dict]:
        """
        Retrieve a product by its ID.
        
        Args:
            product_id: The ID of the product to retrieve
            
        Returns:
            A dictionary containing product details or None if not found
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor(dictionary=True)
        
        try:
            query = "SELECT * FROM products WHERE product_id = %s"
            cursor.execute(query, (product_id,))
            product = cursor.fetchone()
            return product
            
        except Error as e:
            print(f"Error retrieving product: {e}")
            return None
        finally:
            cursor.close()

    def list_products(self, 
                     page: int = 1, 
                     per_page: int = 10,
                     status: Optional[str] = None,
                     stock_status: Optional[str] = None,
                     min_price: Optional[float] = None,
                     max_price: Optional[float] = None,
                     search_query: Optional[str] = None) -> Dict:
        """
        List products with pagination and filtering options.
        
        Args:
            page: Page number (1-based)
            per_page: Number of items per page
            status: Filter by product status
            stock_status: Filter by stock status
            min_price: Minimum price filter
            max_price: Maximum price filter
            search_query: Full-text search query
            
        Returns:
            A dictionary containing the list of products and pagination info
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor(dictionary=True)
        
        try:
            # Build base query
            query = "SELECT * FROM products WHERE 1=1"
            count_query = "SELECT COUNT(*) as total FROM products WHERE 1=1"
            params = []
            count_params = []
            
            # Add filters
            if status:
                query += " AND status = %s"
                params.append(status)
                count_params.append(status)
                
            if stock_status:
                query += " AND stock_status = %s"
                params.append(stock_status)
                count_params.append(stock_status)
                
            if min_price is not None:
                query += " AND price >= %s"
                params.append(min_price)
                count_params.append(min_price)
                
            if max_price is not None:
                query += " AND price <= %s"
                params.append(max_price)
                count_params.append(max_price)
                
            if search_query:
                query += " AND MATCH(category, description, short_description) AGAINST(%s IN NATURAL LANGUAGE MODE)"
                params.append(search_query)
                count_params.append(search_query)
            
            # Get total count
            if count_params:
                cursor.execute(count_query, count_params)
            else:
                cursor.execute(count_query)
            total = cursor.fetchone()['total']
            
            # Add pagination
            offset = (page - 1) * per_page
            query += " ORDER BY date_created DESC LIMIT %s OFFSET %s"
            params.extend([per_page, offset])
            
            # Execute query
            cursor.execute(query, params)
            products = cursor.fetchall()
            
            return {
                'products': products,
                'pagination': {
                    'total': total,
                    'page': page,
                    'per_page': per_page,
                    'total_pages': (total + per_page - 1) // per_page
                }
            }
            
        except Error as e:
            print(f"Error listing products: {e}")
            return {'products': [], 'pagination': {}}
        finally:
            cursor.close()

    def duplicate_product(self, original_id: int, new_name: Optional[str] = None, new_sku: Optional[str] = None) -> Optional[int]:
        """
        Duplicate a product with all its attributes.
        
        Args:
            original_id: ID of the product to duplicate
            new_name: Optional new category for the duplicate
            new_sku: Optional new SKU for the duplicate
            
        Returns:
            The product_id of the new duplicate product or None if failed
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor(dictionary=True)
        new_product_id = None

        try:
            # Get original product
            original = self.retrieve_product(original_id)
            if not original:
                return None
                
            # Prepare duplicate data
            duplicate_data = original.copy()
            duplicate_data.pop('product_id', None)
            duplicate_data.pop('date_created', None)
            duplicate_data.pop('date_modified', None)
            
            if new_name:
                duplicate_data['category'] = new_name
            else:
                duplicate_data['category'] = f"Copy of {duplicate_data['category']}"
                
            if new_sku:
                duplicate_data['sku'] = new_sku
            else:
                duplicate_data['sku'] = f"{duplicate_data.get('sku', '')}-COPY" if duplicate_data.get('sku') else None
            
            # Create the duplicate
            new_product_id = self.create_product(duplicate_data)
            return new_product_id
            
        except Error as e:
            print(f"Error duplicating product: {e}")
            return None
        finally:
            cursor.close()

    def update_product(self, product_id: int, update_data: Dict) -> bool:
        """
        Update an existing product.
        
        Args:
            product_id: The ID of the product to update
            update_data: Dictionary containing fields to update
            
        Returns:
            True if update was successful, False otherwise
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor()
        
        try:
            # Build update query dynamically based on provided fields
            set_clauses = []
            params = []
            
            for field, value in update_data.items():
                # Skip None values to avoid overwriting with NULL
                if value is not None:
                    set_clauses.append(f"{field} = %s")
                    params.append(value)
            
            if not set_clauses:
                return False  # Nothing to update
                
            params.append(product_id)
            query = f"UPDATE products SET {', '.join(set_clauses)} WHERE product_id = %s"
            
            cursor.execute(query, params)
            self.connection.commit()
            
            return cursor.rowcount > 0
            
        except Error as e:
            print(f"Error updating product: {e}")
            self.connection.rollback()
            return False
        finally:
            cursor.close()

    def delete_product(self, product_id: int) -> bool:
        """
        Delete a product.
        
        Args:
            product_id: The ID of the product to delete
            
        Returns:
            True if deletion was successful, False otherwise
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor()
        
        try:
            query = "DELETE FROM products WHERE product_id = %s"
            cursor.execute(query, (product_id,))
            self.connection.commit()
            
            return cursor.rowcount > 0
            
        except Error as e:
            print(f"Error deleting product: {e}")
            self.connection.rollback()
            return False
        finally:
            cursor.close()

    def batch_update_products(self, product_ids: List[int], update_data: Dict) -> int:
        """
        Update multiple products at once with the same changes.
        
        Args:
            product_ids: List of product IDs to update
            update_data: Dictionary containing fields to update
            
        Returns:
            Number of products successfully updated
        """
        if not self.connection or not self.connection.is_connected():
            self.connect()

        cursor = self.connection.cursor()
        
        try:
            # Build update query dynamically based on provided fields
            set_clauses = []
            params = []
            
            for field, value in update_data.items():
                # Skip None values to avoid overwriting with NULL
                if value is not None:
                    set_clauses.append(f"{field} = %s")
                    params.append(value)
            
            if not set_clauses:
                return 0  # Nothing to update
                
            # Add product_ids to params
            params.extend(product_ids)
            
            # Create placeholders for the IN clause
            placeholders = ', '.join(['%s'] * len(product_ids))
            query = f"UPDATE products SET {', '.join(set_clauses)} WHERE product_id IN ({placeholders})"
            
            cursor.execute(query, params)
            self.connection.commit()
            
            return cursor.rowcount
            
        except Error as e:
            print(f"Error in batch updating products: {e}")
            self.connection.rollback()
            return 0
        finally:
            cursor.close()