# SQL Statement for create transaction table

CREATE TABLE transaction (  
order_id VARCHAR(255),  
user_id VARCHAR(255),  
product_id VARCHAR(255),  
quantity INTEGER,  
amount INTEGER,  
order_date DATE  
);

In [143]:
def incrementalLoad(counter, pathfile, table_name):
    # Read input csv into DataFrame
    load_df = pd.read_csv(pathfile) 
    
    # Convert int and datetime with try exception and workaround as insert NaN
    try:
        load_df["quantity"] = load_df["quantity"].astype(int)
    except ValueError as e:
        print(f"Error converting 'quantity': {e}")
        load_df["quantity"] = pd.to_numeric(load_df["quantity"], errors='coerce')
        
    try:
        load_df["amount"] = load_df["amount"].astype(int)
    except ValueError as e:
        print(f"Error converting 'amount': {e}")
        load_df["amount"] = pd.to_numeric(load_df["amount"], errors='coerce')
        
    try:
        load_df["order_date"] = pd.to_datetime(load_df["order_date"], errors='coerce')
    except ValueError as e:
        print(f"Error converting 'order_date': {e}")
        load_df["order_date"] = pd.to_datetime(load_df["order_date"], errors='coerce')

    # For test check unique order_id
    #load_df = load_df.append(load_df.loc[0])

    # make sure input csv has no duplicate of order_id by using whole count and unique count
    if load_df["order_id"].nunique() == load_df["order_id"].count():
        # Query to fetch data from PostgreSQL
        query = f"SELECT * FROM {table_name};"  # Replace 'orders' with your table name

        # Load data into pandas DataFrame
        query_df = pd.read_sql(query, engine)

        # For test remove duplicate order_id
        #if counter == 1: query_df = query_df.append(load_df.loc[0])
        
        # Find order_id from Load_df with existing data in database
        same_values = (load_df["order_id"].isin(query_df["order_id"]))
        
        # Store order_id that is duplicate
        duplicate_order_id = load_df[same_values]["order_id"]
        
        # Remove duplicate if exist before load into database
        if not load_df[same_values].empty:
            load_df = load_df.drop(load_df[same_values].index, axis=0)
            print(f"drop order_id {duplicate_order_id} complete")

        # Load data to PostgreSQL with append
        load_df.to_sql(table_name, engine, if_exists='append', index=False)  
        
        # Count rows in transaction to display as information
        query_count = f"SELECT COUNT(*) AS COUNT FROM {table_name};"
        query_count_df = pd.read_sql(query_count, engine)
        row_count = query_count_df["count"].values[0]
        
        print(f"Load data into PostgreSQL table '{table_name}' completed.")
        print(f"Total rows: '{row_count}'.")
    else:
        print(f"Please cleansing order_id to have no duplicate or same number in file {pathfile}")

In [144]:
import pandas as pd
import numpy as np
import glob
import time
from sqlalchemy import create_engine

# Define database connection parameters
db_url = "postgresql://soravit:wetprasit@localhost:5432/ninfinite"
table_name = "transaction"

# Create a SQLAlchemy engine
engine = create_engine(db_url)

# save list of csv files
csv_files = glob.glob('ref/sales/*.csv')

engine.execute(f"TRUNCATE TABLE {table_name} CASCADE;")

for index in range(len(csv_files)):
    # Assume time.sleep every 2 seconds is daily
    time.sleep(2)
    print(f"Trigger loop {index}")
    print("***START LOADING***")
    incrementalLoad(index, csv_files[index], table_name)
    print("***END LOADING*** \n\n")

Trigger loop 0
***START LOADING***
Load data into PostgreSQL table 'transaction' completed.
Total rows: '194'.
***END LOADING*** 


Trigger loop 1
***START LOADING***
Load data into PostgreSQL table 'transaction' completed.
Total rows: '377'.
***END LOADING*** 


Trigger loop 2
***START LOADING***
Load data into PostgreSQL table 'transaction' completed.
Total rows: '550'.
***END LOADING*** 


Trigger loop 3
***START LOADING***
Load data into PostgreSQL table 'transaction' completed.
Total rows: '714'.
***END LOADING*** 


Trigger loop 4
***START LOADING***
Load data into PostgreSQL table 'transaction' completed.
Total rows: '870'.
***END LOADING*** 


Trigger loop 5
***START LOADING***
Load data into PostgreSQL table 'transaction' completed.
Total rows: '1058'.
***END LOADING*** 


Trigger loop 6
***START LOADING***
Load data into PostgreSQL table 'transaction' completed.
Total rows: '1236'.
***END LOADING*** 




In [147]:
# Query to fetch data from PostgreSQL
select_query = f"SELECT * FROM {table_name};"

# Run query to load result from ETL process in DataFrame and save to csv file
database_df = pd.read_sql(select_query, engine)
database_df.to_csv('sale_transactions.csv', index=False)

database_df

Unnamed: 0,order_id,user_id,product_id,quantity,amount,order_date
0,812,482,100081,77,227,2024-03-01
1,813,516,100047,55,226,2024-03-01
2,814,500,100065,46,218,2024-03-01
3,815,500,100082,1,211,2024-03-01
4,816,515,100053,78,194,2024-03-01
...,...,...,...,...,...,...
1231,2043,502,100006,98,134,2024-03-07
1232,2044,496,100043,6,98,2024-03-07
1233,2045,497,100007,94,222,2024-03-07
1234,2046,493,100073,30,48,2024-03-07
