In [25]:
import pandas as pd
import os
import time
from sqlalchemy import create_engine
import logging

# Ensure logs directory exists
if not os.path.exists('logs'):
    os.makedirs('logs')

logging.basicConfig(
    filename="logs/ingestion_db.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    filemode="a"
)

engine = create_engine('sqlite:///inventory.db')

def ingest_db(file_path, table_name, engine):
    '''Ingest the CSV into database table using chunks for memory efficiency'''
    # We use 'replace' for the first chunk to clear old data, 
    # then 'append' for the rest.
    first_chunk = True
    for chunk in pd.read_csv(file_path, chunksize=100000):
        if first_chunk:
            chunk.to_sql(table_name, con=engine, if_exists='replace', index=False)
            first_chunk = False
        else:
            chunk.to_sql(table_name, con=engine, if_exists='append', index=False)

def load_raw_data():
    '''This function identifies CSVs and passes paths to the ingest function'''
    start = time.time()
    
    # Check if 'data' folder exists
    if not os.path.exists('data'):
        logging.error("Folder 'data' not found!")
        return

    for file in os.listdir('data'):
        if file.endswith('.csv'):
            file_path = os.path.join('data', file)
            table_name = file[:-4]
            
            logging.info(f'Starting ingestion for {file}...')
            try:
                # Pass the PATH, not the dataframe
                ingest_db(file_path, table_name, engine)
                logging.info(f'Successfully ingested {file} into db')
            except Exception as e:
                logging.error(f'Error ingesting {file}: {e}')

    end = time.time()
    total_time = (end - start) / 60
    logging.info('-----------------Ingestion complete-------------')
    logging.info(f'Total Time Taken: {total_time:.2f} minutes')

if __name__ == "__main__":
    load_raw_data()



In [26]:
import sqlite3
import pandas as pd

conn = sqlite3.connect('inventory.db')
# This counts the rows in the sales table
count = pd.read_sql_query("SELECT COUNT(*) as total FROM sales", conn)
print(f"Success! There are {count['total'][0]} rows in the database.")
conn.close()


Success! There are 12825363 rows in the database.
