In [2]:
!pip install pandas sqlalchemy





[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [5]:
import pandas as pd
import os
from sqlalchemy import create_engine
import logging
import time

# Configure logging
logging.basicConfig(
    filename="logs/ingestion_db.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    filemode="a"
)

# Create SQLite database engine
engine = create_engine('sqlite:///inventory.db')


In [6]:
def ingest_db(df, table_name, engine, chunksize=50000):
    """Ingests DataFrame into database"""
    df.to_sql(table_name, con=engine, if_exists='replace', index=False, chunksize=chunksize)
    logging.info(f"Inserted {df.shape[0]} rows into '{table_name}' table.")
    print(f"Inserted {df.shape[0]} rows into '{table_name}' table.")

def load_raw_data():
    """Loads CSV files as DataFrames and ingests them into the database"""
    start = time.time()

    for file in os.listdir('data'):
        if file.endswith('.csv'):
            file_path = os.path.join('data', file)
            logging.info(f"Processing {file}...")
            print(f"Processing {file}...")

            try:
                # Check file size in MB
                file_size = os.path.getsize(file_path) / (1024 * 1024)

                if file_size > 100:
                    logging.info(f"{file} is large ({file_size:.2f} MB). Ingesting in chunks...")
                    print(f"{file} is large ({file_size:.2f} MB). Ingesting in chunks...")
                    chunk_size = 50000
                    for chunk in pd.read_csv(file_path, chunksize=chunk_size, low_memory=False):
                        ingest_db(chunk, file[:-4], engine)
                    logging.info(f"{file} ingested successfully in chunks.")
                else:
                    df = pd.read_csv(file_path, low_memory=False)
                    ingest_db(df, file[:-4], engine)
                    logging.info(f"{file} ingested successfully.")

            except Exception as e:
                logging.error(f"Failed to ingest {file}. Error: {e}")
                print(f"⚠️ Error while ingesting {file}: {e}")

    end = time.time()
    total_time = (end - start) / 60
    logging.info("-----Ingestion Complete-----")
    logging.info(f"Total Time Taken: {total_time:.2f} minutes")
    print(f"✅ All ingestion complete. Total time: {total_time:.2f} minutes")


In [7]:
load_raw_data()


Processing begin_inventory.csv...
Inserted 206529 rows into 'begin_inventory' table.
Processing end_inventory.csv...
Inserted 224489 rows into 'end_inventory' table.
Processing purchases.csv...
purchases.csv is large (344.83 MB). Ingesting in chunks...
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.
Inserted 50000 rows into 'purchases' table.

In [9]:
from sqlalchemy import inspect

# Create inspector object
inspector = inspect(engine)

# Get all table names
tables = inspector.get_table_names()

print("Tables in database:", tables)


Tables in database: ['begin_inventory', 'end_inventory', 'purchase_prices', 'purchases', 'sales', 'vendor_invoice']
