In [14]:
from dotenv import load_dotenv
from mysql.connector import Error
import pandas as pd
import mysql.connector
import logging
import os
import time 

load_dotenv()  

True

In [15]:
logging.basicConfig(
    filename='logs/db_operations.log',       # Log file name
    level=logging.INFO,                      # Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='a'                        
)

In [16]:
def ingest_db(db_name=None):
    try:
        if db_name:
            conn = mysql.connector.connect(
                host=os.getenv("DB_HOST"),
                user=os.getenv("DB_USER"),
                password=os.getenv("DB_PASSWORD"),
                database=db_name,
                allow_local_infile=True,
                auth_plugin='mysql_native_password'
            )
            logging.info(f"Successfully connected to the database: {db_name}")
        else:
            conn = mysql.connector.connect(
                host=os.getenv("DB_HOST"),
                user=os.getenv("DB_USER"),
                password=os.getenv("DB_PASSWORD"),
                allow_local_infile=True,
                auth_plugin='mysql_native_password'
            )
            logging.info("Connection with MySQL established without specifying a database.")
        return conn 

    except Exception as e:
        logging.error(f"Error connecting to MySQL: {e}")
        return None   

In [17]:
def create_database(conn, query):
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        print("Database created successfully")
        
    except Error as e:
        print(f"Failed to create database: {e}")

In [18]:
def execute_query(conn, query):
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        conn.commit()
        logging.info(f"Query executed successfully: {query}")
        
    except Error as e:
        logging.error(f"Error executing query '{query}': {e}")

In [None]:
def create_table_from_csv(conn, csv_file, table_name):
    """
    Create a MySQL table automatically based on the CSV columns and dtypes.
    Only creates the table if it does not exist.
    """
    
    df_sample = pd.read_csv(csv_file, nrows=5)
    
    # Map pandas dtypes to MySQL types
    type_mapping = {
        'int64': 'BIGINT',
        'float64': 'DOUBLE',
        'object': 'VARCHAR(255)',
        'bool': 'TINYINT(1)',
    }

    columns_sql = []
    for col, dtype in df_sample.dtypes.items():
        mysql_type = type_mapping.get(str(dtype), 'VARCHAR(255)')
        col_clean = col.replace(" ", "_").replace("-", "_")  # sanitize column names
        columns_sql.append(f"`{col_clean}` {mysql_type}")

    create_stmt = f"CREATE TABLE IF NOT EXISTS `{table_name}` (\n  "
    create_stmt += ",\n  ".join(columns_sql)
    create_stmt += "\n);"

    try:
        cursor = conn.cursor()
        cursor.execute(create_stmt)
        cursor.close()
        print(f"Table `{table_name}` created successfully.")
        
    except Error as e:
        print(f"Error creating table `{table_name}`:", e)
        
    except Exception as e:
        print(f"Unexpected error creating table `{table_name}`:", e)

In [20]:
def load_csv_to_mysql(conn, csv_file, table_name):
    try:
        cursor = conn.cursor()
        query = f"""
        LOAD DATA LOCAL INFILE '{csv_file}'
        INTO TABLE `{table_name}`
        FIELDS TERMINATED BY ','
        ENCLOSED BY '"'
        LINES TERMINATED BY '\\n'
        IGNORE 1 ROWS;
        """
        cursor.execute(query)
        conn.commit()
        cursor.close()
        print(f"Loaded {csv_file} into {table_name}")
    except Error as e:
        print(f"Error loading CSV {csv_file}:", e)


In [21]:
def load_raw_data(conn):
    logging.info("-------Starting Ingestion-------")
    for file in os.listdir("csv_data"):
        start = time.time()
        if file.endswith(".csv"):
            file_path = 'csv_data/' + file
            df = pd.read_csv(file_path)
            logging.info(f"Uploading {file} to database...")
            create_table_from_csv(conn, file_path, file[:-4])
            load_csv_to_mysql(conn, file_path, file[:-4])
        end = time.time() 
        logging.info(f"Time taken to ingest {file}: {(end - start)/60:.2f} mins")  
    logging.info("-------Ingestion Complete-------")  
    

In [22]:
conn = ingest_db()
query =  f"CREATE DATABASE IF NOT EXISTS {os.getenv("DB_NAME")};"
create_database(conn, query)
conn = ingest_db(db_name=os.getenv("DB_NAME"))


Database created successfully


In [75]:
load_raw_data(conn)

Table `begin_inventory` created successfully.
Loaded csv_data/begin_inventory.csv into begin_inventory
Table `end_inventory` created successfully.
Loaded csv_data/end_inventory.csv into end_inventory
Table `purchases` created successfully.
Loaded csv_data/purchases.csv into purchases
Table `purchase_prices` created successfully.
Loaded csv_data/purchase_prices.csv into purchase_prices
Table `sales` created successfully.
Loaded csv_data/sales.csv into sales
Table `vendor_invoice` created successfully.
Loaded csv_data/vendor_invoice.csv into vendor_invoice
