In [1]:
pip install sqlalchemy psycopg2

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [None]:
import pandas as pd
import os 
from sqlalchemy import create_engine
import logging 
import time 

# Set up logging
logging.basicConfig(
    filename='Logs/ingestion_db.log',
    level=logging.DEBUG,
    format="%(asctime)s-%(levelname)s-%(message)s",
    filemode="a"
)

# PostgreSQL connection info
username = 'postgres'
password = '1542'
host = 'localhost'
port = '5432'
database = 'Project'

# Create SQLAlchemy engine
engine = create_engine(f'postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}')
print("Connection established successfully!")


def ingest_db(df, table_name, engine, chunk_size=100000):
    '''Ingest smaller DataFrames into PostgreSQL'''
    df.to_sql(table_name, con=engine, if_exists='replace', index=False, chunksize=chunk_size, method='multi')


def load_raw_data():
    '''Loads all CSVs and ingests into DB'''
    start = time.time()
    
    for file in os.listdir('data'):
        if file.endswith('.csv'):
            table_name = file[:-4]  # remove .csv extension
            file_path = f'data/{file}'
            try:
                if 'sales' in file or 'inventory' in file or 'purchases' in file:
                    # Create table first with just headers
                    pd.read_csv(file_path, nrows=1).to_sql(table_name, con=engine, if_exists='replace', index=False)
                    logging.info(f'{table_name}: Table created with headers.')

                    # Append in chunks
                    chunk_iter = pd.read_csv(file_path, chunksize=100000)
                    for i, chunk in enumerate(chunk_iter):
                        chunk.to_sql(table_name, con=engine, if_exists='append', index=False, method='multi')
                        logging.info(f'{table_name}: Inserted chunk {i} with shape {chunk.shape}')
                else:
                    df = pd.read_csv(file_path)
                    ingest_db(df, table_name, engine)
                    logging.info(f'{table_name}: Ingested successfully!')

            except Exception as e:
                logging.error(f'{table_name}: Error ingesting - {str(e)}')

    end = time.time()
    total_time = (end - start) / 60
    logging.info('Ingestion complete.')
    logging.info(f'Total time taken: {total_time:.2f} minutes')


# Run the ingestion function
if __name__ == '__main__':
    load_raw_data()


Connection established successfully!
