In [1]:
import pandas as pd
import os
from sqlalchemy import create_engine
import logging
import time

logging.basicConfig(
    filename="logs/ingestion_db.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    filemode="a"
)

engine = create_engine('sqlite:///inventory.db')

def ingest_db(df, table_name, engine, chunksize=5000):
    """Ingest dataframe into the database in chunks to avoid memory overload."""
    df.to_sql(table_name, con=engine, if_exists='replace', index=False, chunksize=chunksize)
    logging.info(f"{table_name} ingested successfully.")

def load_raw_data():
    """Load CSVs as DataFrames and ingest into DB"""
    start = time.time()
    data_dir = 'data'

    for file in os.listdir(data_dir):
        if file.endswith('.csv'):
            file_path = os.path.join(data_dir, file)
            logging.info(f"Reading {file_path}")
            try:
                # Read in chunks to avoid memory errors
                for chunk in pd.read_csv(file_path, chunksize=9000000):
                    ingest_db(chunk, file[:-4], engine)
            except Exception as e:
                logging.error(f"Error processing {file}: {e}")

    end = time.time()
    total_time_taken = round((end - start) / 60, 2)
    logging.info("------------------Ingestion Complete------------------")
    logging.info(f"Total time taken {total_time_taken} minutes")

if __name__ == '__main__':
    load_raw_data()
