In [4]:
import os
import time
import logging
import urllib.parse
import pandas as pd
from sqlalchemy import create_engine, text
import pymysql

# -------------------------------------------------
# Logging Configuration
# -------------------------------------------------
logging.basicConfig(
    filename="logging.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    filemode="a"
)

# -------------------------------------------------
# Database Configuration
# -------------------------------------------------
password = urllib.parse.quote_plus("Abak@990")

engine = create_engine(
    f"mysql+pymysql://root:{password}@127.0.0.1/vendor_data?local_infile=1",
    pool_pre_ping=True,
    echo=False
)

# -------------------------------------------------
# Utility Functions
# -------------------------------------------------
def sanitize_table_name(filename: str) -> str:
    return (
        filename.replace(".csv", "")
        .lower()
        .replace(" ", "_")
        .replace("-", "_")
    )

# -------------------------------------------------
# Step 1: Create Table Schema (Sample Only)
# -------------------------------------------------
def create_table_from_csv(csv_path: str, table_name: str, engine):
    """
    Reads only a small sample to infer schema
    """
    df_sample = pd.read_csv(csv_path, nrows=1000)

    df_sample.to_sql(
        name=table_name,
        con=engine,
        if_exists="replace",
        index=False
    )

    logging.info(f"Table '{table_name}' schema created")

# -------------------------------------------------
# Step 2: Ultra-Fast Bulk Load
# -------------------------------------------------
def bulk_load_csv(csv_path: str, table_name: str, engine):
    """
    Uses MySQL LOAD DATA LOCAL INFILE (FASTEST)
    """
    load_sql = f"""
    LOAD DATA LOCAL INFILE :file
    INTO TABLE {table_name}
    FIELDS TERMINATED BY ','
    ENCLOSED BY '"'
    LINES TERMINATED BY '\\n'
    IGNORE 1 LINES
    """

    with engine.begin() as conn:
        conn.execute(
            text(load_sql),
            {"file": csv_path}
        )

    logging.info(f"Bulk loaded data into '{table_name}'")

# -------------------------------------------------
# Main Loader
# -------------------------------------------------
def load_data(data_path: str = os.path.join(os.getcwd(), "data")):
    start_time = time.time()
    success = 0
    failed = 0

    if not os.path.isdir(data_path):
        logging.error(f"Data directory not found: {data_path}")
        return

    for filename in os.listdir(data_path):
        if not filename.lower().endswith(".csv"):
            continue

        file_path = os.path.join(data_path, filename)
        table_name = sanitize_table_name(filename)

        logging.info(f"Processing file: {filename}")

        try:
            create_table_from_csv(file_path, table_name, engine)
            bulk_load_csv(file_path, table_name, engine)
            success += 1

        except Exception as e:
            logging.exception(f"Failed processing {filename}: {e}")
            failed += 1

    total_minutes = (time.time() - start_time) / 60

    logging.info(
        f"INGESTION COMPLETE | "
        f"Time: {total_minutes:.2f} min | "
        f"Success: {success} | "
        f"Failed: {failed}"
    )

    engine.dispose()

# -------------------------------------------------
# Entry Point
# -------------------------------------------------
if __name__ == "__main__":
    load_data()