In [2]:
import pandas as pd
from sqlalchemy import create_engine

# MySQL connection string
engine = create_engine("mysql+mysqlconnector://root:test01!@localhost/amazon_india")

In [None]:
# Add cleaned transaction details to Mysql
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError # Import for better error handling
import os # Import os for path handling if needed, though not directly used here

# ==========================
# MySQL Connection Settings
# ==========================
USER = "root"              # Your MySQL username (or 'amazon_user' if you created one)
PASSWORD = "test01!"       # Your MySQL password
HOST = "localhost"         # Host
PORT = 3306                # MySQL port
DB = "amazon_india"        # Database name

# Create SQLAlchemy engine
# Added pool_pre_ping=True for better connection management
engine = create_engine(f"mysql+mysqlconnector://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}", pool_pre_ping=True)

# ==========================
# Path to your cleaned transactions CSV
# ==========================
transactions_csv_path = r"D:\amazon_india_project\data\Cleaned-dataset\cleaned_amazon_india_2015_2025.csv"

print(f"📂 Attempting to load transactions data from '{transactions_csv_path}'...")

try:
# Load cleaned transactions dataset
# Explicitly parse 'order_date' during loading
# We load the full DataFrame once for initial processing (like boolean conversion)
# and then use pd.read_csv again in chunks for actual loading to SQL.
# This avoids potential issues if the first read_csv is also chunked and needs bool_cols info.
    initial_df = pd.read_csv(transactions_csv_path, parse_dates=['order_date'])

    # Convert boolean columns to int (0 or 1) for MySQL's TINYINT(1)
    bool_cols = ['is_prime_member', 'is_festival_sale', 'is_prime_eligible']
    for col in bool_cols:
        if col in initial_df.columns: # Check if column exists
            initial_df[col] = initial_df[col].astype(int)
        else:
            print(f"⚠️ Warning: Boolean column '{col}' not found in initial DataFrame.")

    # --- IMPORTANT: Ensure your SQL table schema matches the DataFrame columns ---
    # The `transactions` table should be created with VARCHAR for IDs and DATE for dates.
    # If you're using the CREATE TABLE statement I provided previously, it should match.

    # Load into MySQL using chunks
    # Using chunksize for large datasets to prevent memory issues and improve stability
    chunksize = 10000 # Adjust chunksize as needed based on your system's memory

    # Re-read CSV in chunks for the actual SQL insertion
    # This ensures that if initial_df is huge, we don't hold it all in memory
    # while also allowing pre-processing like boolean conversion per chunk.
    for i, chunk in enumerate(pd.read_csv(transactions_csv_path, chunksize=chunksize, parse_dates=['order_date'])):
        # Apply boolean conversion to each chunk
        for col in bool_cols:
            if col in chunk.columns:
                chunk[col] = chunk[col].astype(int)
        
        # Handle potential NaN values in numeric columns that might be NOT NULL in SQL
        # For example, if 'delivery_days' or 'customer_rating' are NOT NULL in SQL
        # and have NaNs in DataFrame, this will cause an error.
        # Fill NaNs with a default value (e.g., 0, -1) or the mean/median if necessary.
        # Example: chunk['delivery_days'].fillna(0, inplace=True)
        # Example: chunk['customer_rating'].fillna(0.0, inplace=True)
        
        chunk.to_sql('transactions', con=engine, if_exists='append', index=False)
        print(f"  Loaded chunk {i+1} into 'transactions' table...")

    print(f"✅ Successfully loaded all data into 'transactions' table.")

    # Verify row count
    with engine.connect() as conn:
        trans_count = conn.execute("SELECT COUNT(*) FROM transactions").fetchone()[0]
        print(f"📊 Final row count in 'transactions' table: {trans_count}")

except SQLAlchemyError as e:
    print(f"❌ Database error during loading: {e}")
    # Explicitly rollback the transaction if an error occurs
    with engine.connect() as conn:
        conn.rollback()
    print("⚠️ Rolled back transaction.")
except Exception as e:
    print(f"❌ An unexpected error occurred: {e}")

    print("\n🚀 Transactions data loading process completed.")

📂 Attempting to load transactions data from 'D:\amazon_india_project\data\Cleaned-dataset\cleaned_amazon_india_2015_2025.csv'...
  Loaded chunk 1 into 'transactions' table...
  Loaded chunk 2 into 'transactions' table...
  Loaded chunk 3 into 'transactions' table...
  Loaded chunk 4 into 'transactions' table...
  Loaded chunk 5 into 'transactions' table...
  Loaded chunk 6 into 'transactions' table...
  Loaded chunk 7 into 'transactions' table...
  Loaded chunk 8 into 'transactions' table...
  Loaded chunk 9 into 'transactions' table...
  Loaded chunk 10 into 'transactions' table...
  Loaded chunk 11 into 'transactions' table...
  Loaded chunk 12 into 'transactions' table...
  Loaded chunk 13 into 'transactions' table...
  Loaded chunk 14 into 'transactions' table...
  Loaded chunk 15 into 'transactions' table...
  Loaded chunk 16 into 'transactions' table...
  Loaded chunk 17 into 'transactions' table...
  Loaded chunk 18 into 'transactions' table...
  Loaded chunk 19 into 'transactio

In [None]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, MetaData, Table, text, inspect
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.dialects.mysql import insert as mysql_insert

# ==========================
# MySQL Connection Settings
# ==========================
USER = "root"
PASSWORD = "test01!"
HOST = "localhost"
PORT = 3306
DB = "amazon_india"

# Create SQLAlchemy engine
engine = create_engine(
    f"mysql+mysqlconnector://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}",
    pool_pre_ping=True
)

# ==========================
# Path to cleaned transactions CSV
# ==========================
transactions_csv_path = r"D:\amazon_india_project\data\Cleaned-dataset\cleaned_amazon_india_2015_2025.csv"

print(f"📂 Attempting to load transactions data from '{transactions_csv_path}'...")

# Define columns that should be integers, including boolean types
int_cols = [
    'is_prime_member', 'is_festival_sale', 'is_prime_eligible', 
    'quantity', 'delivery_days', 'order_month', 'order_year', 'order_quarter'
]

# Define columns that should be floats/decimals
decimal_cols = [
    'original_price_inr', 'discount_percent', 'discounted_price_inr',
    'subtotal_inr', 'delivery_charges', 'final_amount_inr', 
    'product_weight_kg', 'customer_rating', 'product_rating'
]

try:
    # Reflect transactions table from DB to ensure metadata is up-to-date
    metadata = MetaData()
    metadata.reflect(bind=engine)
    transactions_table = metadata.tables['transactions']

    # Detect last loaded transaction_id
    with engine.connect() as conn:
        result = conn.execute(
            text("SELECT transaction_id FROM transactions ORDER BY transaction_id DESC LIMIT 1")
        ).fetchone()
        last_id_in_db = result[0] if result else None

    if last_id_in_db:
        print(f"🆔 Last loaded transaction_id in DB: {last_id_in_db}")
    else:
        print("📅 No existing data found in DB. Starting from the beginning.")

    chunksize = 10000
    start_loading = False if last_id_in_db else True

    # Read CSV in chunks
    for i, chunk in enumerate(pd.read_csv(transactions_csv_path, chunksize=chunksize, parse_dates=['order_date'])):
        
        # Robust data type handling for integer columns
        for col in int_cols:
            if col in chunk.columns:
                # Convert to numeric, coercing errors (e.g., text) to NaN.
                numeric_col = pd.to_numeric(chunk[col], errors='coerce')
                
                # Round floating-point numbers to handle cases like 5.0 vs 5.5.
                rounded_col = numeric_col.round()
                
                # Convert to nullable integer type, handling NaNs correctly.
                chunk[col] = rounded_col.astype('Int64')

        # Robust data type handling for decimal columns
        for col in decimal_cols:
            if col in chunk.columns:
                # Convert to numeric, coercing errors to NaN.
                numeric_col = pd.to_numeric(chunk[col], errors='coerce')
                # Explicitly cast to float64, ensuring consistency.
                chunk[col] = numeric_col.astype('float64')

        # Replace ALL NaT/NaN values with None for MySQL compatibility
        # This is a key step to catch any remaining NaNs, including in decimal columns
        chunk = chunk.replace({np.nan: None})

        # Resume logic
        if last_id_in_db and not start_loading:
            if last_id_in_db in chunk['transaction_id'].values:
                idx = chunk.index[chunk['transaction_id'] == last_id_in_db][0]
                chunk = chunk.loc[idx+1:]  # start after last loaded row
                start_loading = True
                if chunk.empty:
                    print(f"⏩ Skipping chunk {i+1} (all rows already loaded)")
                    continue
                else:
                    print(f"▶️ Resuming from chunk {i+1} after transaction_id {last_id_in_db}")
            else:
                print(f"⏩ Skipping chunk {i+1} (already loaded)")
                continue
        else:
            start_loading = True

        # Bulk UPSERT into MySQL
        try:
            with engine.begin() as conn:
                rows = chunk.to_dict(orient='records')
                if rows:
                    stmt = mysql_insert(transactions_table).values(rows)
                    stmt = stmt.on_duplicate_key_update(
                        **{col.name: stmt.inserted[col.name] for col in transactions_table.columns}
                    )
                    conn.execute(stmt)
            print(f"✅ Loaded chunk {i+1} into 'transactions' table...")
        except SQLAlchemyError as e:
            print(f"❌ Error loading chunk {i+1}: {e}")
            break

    # Final row count
    with engine.connect() as conn:
        trans_count = conn.execute(text("SELECT COUNT(*) FROM transactions")).fetchone()[0]
        print(f"📊 Final row count in 'transactions' table: {trans_count}")

except SQLAlchemyError as e:
    print(f"❌ Database error during loading: {e}")
except Exception as e:
    print(f"❌ An unexpected error occurred: {e}")

print("\n🚀 Transactions data loading process completed.")

📂 Attempting to load transactions data from 'D:\amazon_india_project\data\Cleaned-dataset\cleaned_amazon_india_2015_2025.csv'...
🆔 Last loaded transaction_id in DB: TXN_2025_00076999
⏩ Skipping chunk 1 (already loaded)
⏩ Skipping chunk 2 (already loaded)
⏩ Skipping chunk 3 (already loaded)
⏩ Skipping chunk 4 (already loaded)
⏩ Skipping chunk 5 (already loaded)
⏩ Skipping chunk 6 (already loaded)
⏩ Skipping chunk 7 (already loaded)
⏩ Skipping chunk 8 (already loaded)
⏩ Skipping chunk 9 (already loaded)
⏩ Skipping chunk 10 (already loaded)
⏩ Skipping chunk 11 (already loaded)
⏩ Skipping chunk 12 (already loaded)
⏩ Skipping chunk 13 (already loaded)
⏩ Skipping chunk 14 (already loaded)
⏩ Skipping chunk 15 (already loaded)
⏩ Skipping chunk 16 (already loaded)
⏩ Skipping chunk 17 (already loaded)
⏩ Skipping chunk 18 (already loaded)
▶️ Resuming from chunk 19 after transaction_id TXN_2025_00076999
✅ Loaded chunk 19 into 'transactions' table...
✅ Loaded chunk 20 into 'transactions' table...
✅

In [None]:
# Add Product catlog details to Mysql
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError

# ==========================
# MySQL Connection Settings
# ==========================
USER = "root"              # Your MySQL username
PASSWORD = "test01!"       # Your MySQL password
HOST = "localhost"         # Host
PORT = 3306                # MySQL port
DB = "amazon_india"        # Database name

# Create SQLAlchemy engine
engine = create_engine(f"mysql+mysqlconnector://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}", pool_pre_ping=True)

# ==========================
# Path to your product catalog CSV
# ==========================
products_csv_path = r"D:\amazon_india_project\data\Dataset\amazon_india_products_catalog.csv" # Adjust path as needed

print(f"📂 Attempting to load product catalog from '{products_csv_path}'...")

try:
# Load product catalog dataset
    products_df = pd.read_csv(products_csv_path)

    # Convert boolean columns to int (0 or 1) for MySQL's TINYINT(1)
    bool_cols = ['is_prime_eligible'] # Only this one seems to be boolean in your sample
    for col in bool_cols:
        if col in products_df.columns:
            products_df[col] = products_df[col].astype(int)
        else:
            print(f"⚠️ Warning: Boolean column '{col}' not found in products DataFrame.")

    # --- IMPORTANT: Ensure your SQL table schema matches the DataFrame columns ---
    # The `products` table should be created with VARCHAR for product_id.

    # Load into MySQL
    # Using chunksize for large datasets
    chunksize = 500 # Products catalog is smaller, so chunksize can be adjusted

    for i, chunk in enumerate(pd.read_csv(products_csv_path, chunksize=chunksize)):
        # Apply boolean conversion to each chunk
        for col in bool_cols:
            if col in chunk.columns:
                chunk[col] = chunk[col].astype(int)
        
        chunk.to_sql('products', con=engine, if_exists='append', index=False)
        print(f"  Loaded chunk {i+1} into 'products' table...")

    print(f"✅ Successfully loaded all data into 'products' table.")

    # Verify row count
    with engine.connect() as conn:
        prod_count = conn.execute("SELECT COUNT(*) FROM products").fetchone()[0]
        print(f"📊 Final row count in 'products' table: {prod_count}")

except SQLAlchemyError as e:
    print(f"❌ Database error during loading products: {e}")
    with engine.connect() as conn:
        conn.rollback()
    print("⚠️ Rolled back transaction for products table.")
except Exception as e:
    print(f"❌ An unexpected error occurred loading products: {e}")

    print("\n🚀 Product catalog loading process completed.")

In [1]:
# Add unique customer details to Mysql
import pandas as pd
from sqlalchemy import create_engine

USER = "root"
PASSWORD = "test01!"
HOST = "localhost"
PORT = 3306
DB = "amazon_india"
engine = create_engine(f"mysql+mysqlconnector://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}", pool_pre_ping=True)

# Load transactions data (or just read the cleaned CSV again)
transactions_df = pd.read_csv(r"D:\amazon_india_project\data\cleaned_amazon_india_complete_2015_2025.csv")

# Extract unique customer data
customers_df = transactions_df[[
    'customer_id', 'customer_city', 'customer_state',
    'customer_tier', 'customer_spending_tier', 'customer_age_group'
]].drop_duplicates(subset=['customer_id'])

try:
    customers_df.to_sql('customers', con=engine, if_exists='append', index=False)
    print(f"✅ Loaded {len(customers_df)} unique customers into 'customers' table.")
except Exception as e:
    print(f"❌ Error loading customers table: {e}")
    with engine.connect() as conn:
        conn.rollback()

✅ Loaded 354969 unique customers into 'customers' table.


In [8]:
import pandas as pd
from sqlalchemy import create_engine
import mysql.connector

# Database connection setup
USER = 'root'
PASSWORD = 'test01!'
HOST = 'localhost'
PORT = 3306
DB = 'amazon_india'
engine = create_engine(f'mysql+mysqlconnector://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}', pool_pre_ping=True)

# Load and process data from the transactions CSV
transactions_df = pd.read_csv(
    r'D:\amazon_india_project\data\cleaned_amazon_india_complete_2015_2025.csv', 
    parse_dates=['order_date']
)

# Filter for unique dates and drop any rows where 'date' is null (NaT)
# This prevents the IntegrityError caused by trying to insert a NULL date.
unique_dates = pd.DataFrame({'date': transactions_df['order_date'].unique()})
unique_dates['date'] = pd.to_datetime(unique_dates['date'])
unique_dates.dropna(subset=['date'], inplace=True)

# Create time dimension attributes, including only the columns that exist in your table
time_df = pd.DataFrame({
    'date': unique_dates['date'],
    'year': unique_dates['date'].dt.year,
    'month': unique_dates['date'].dt.month,
    'quarter': unique_dates['date'].dt.quarter,
})

# Filter out dates that already exist in the database before inserting
try:
    existing_dates = pd.read_sql_table('time_dimension', con=engine, columns=['date'])
    # Convert 'date' columns to the same data type for accurate comparison
    new_dates_df = time_df[~time_df['date'].isin(existing_dates['date'])]

    if not new_dates_df.empty:
        new_dates_df.to_sql('time_dimension', con=engine, if_exists='append', index=False)
        print(f'✅ Loaded {len(new_dates_df)} new rows into time_dimension table.')
    else:
        print('✅ No new unique dates to load.')
except Exception as e:
    print(f'❌ Error loading time_dimension table: {e}')
    with engine.connect() as conn:
        conn.rollback()


✅ Loaded 4015 new rows into time_dimension table.
