In [None]:
import pandas as pd
from sqlalchemy import create_engine, text
import os
import logging
import urllib.parse

# ------------------- Logging Setup -------------------
log_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# File handler
file_handler = logging.FileHandler("csv_to_mysql.log")
file_handler.setFormatter(log_formatter)
logger.addHandler(file_handler)

# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(log_formatter)
logger.addHandler(console_handler)

# ------------------- Config -------------------
csv_files = [
    ('album2.csv', 'album'),
    ('artist.csv', 'artist'),
    ('customer.csv', 'customer'),
    ('employee.csv', 'employee'),
    ('genre.csv', 'genre'),
    ('invoice.csv', 'invoice'),
    ('invoice_line.csv', 'invoice_line'),
    ('media_type.csv', 'media_type'),
    ('playlist.csv', 'playlist'),
    ('playlist_track.csv', 'playlist_track'),
    ('track.csv', 'track')
]

folder_path = 'D:/Projects/Music_store_Project/Cvs_files'

# ------------------- Database Engine -------------------
username = "root"
password = "Amulya@2002"   # your real password
database = "music_database"

# URL encode the password (important for special chars like @, !, etc.)
encoded_password = urllib.parse.quote_plus(password)

engine = create_engine(
    f"mysql+pymysql://{username}:{encoded_password}@localhost:3306/{database}"
)

# ------------------- Helpers -------------------
def auto_convert_dates(df, table_name):
    """
    Convert object columns to datetime if they look like dates.
    Logs which columns were converted.
    """
    for col in df.columns:
        if df[col].dtype == 'object':
            sample = df[col].dropna().astype(str).head(5)
            if sample.empty:
                continue
            try:
                converted = pd.to_datetime(
                    df[col],
                    errors="coerce",
                    dayfirst=True,
                    infer_datetime_format=True
                )
                success_ratio = converted.notna().mean()
                if success_ratio > 0.5:
                    df[col] = converted
                    logging.info(f"[{table_name}] Converted column '{col}' to datetime (success: {success_ratio:.0%})")
            except Exception as e:
                logging.warning(f"[{table_name}] Skipped column '{col}' during datetime conversion: {e}")
    return df

# ------------------- Main Loop -------------------
with engine.begin() as conn:  # ensures commit/rollback safely
    for csv_file, table_name in csv_files:
        try:
            file_path = os.path.join(folder_path, csv_file)
            df = pd.read_csv(file_path)

            # Replace NaN with None for SQL compatibility
            df = df.where(pd.notnull(df), None)

            # Clean column names
            df.columns = [
                col.replace(' ', '_').replace('-', '_').replace('.', '_')
                for col in df.columns
            ]

            # Auto-convert datetime-like columns
            df = auto_convert_dates(df, table_name)

            csv_row_count = len(df)
            logging.info(f"Processing {csv_file} â†’ {table_name}, rows in CSV: {csv_row_count}")

            # Insert into MySQL
            df.to_sql(
                name=table_name,
                con=conn,
                if_exists='append',
                index=False,
                method='multi'
            )

            # Verify row count in MySQL
            result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
            db_row_count = result.scalar()

            logging.info(f"Loaded {csv_row_count} rows into `{table_name}`. Total rows in table now: {db_row_count}")

            if db_row_count < csv_row_count:
                logging.warning(f"[{table_name}] Row count mismatch! Inserted fewer rows than CSV had.")
            elif db_row_count > csv_row_count:
                logging.info(f"[{table_name}] Table has more rows than this CSV (probably from previous loads).")

        except Exception as e:
            logging.error(f"Error processing {csv_file}: {e}")

logging.info("ETL process completed successfully.")