In [276]:
import psycopg2, datetime
import pandas as pd

# config
password = 31415926
table = 'order_lines'  # items, customers, order_headers, order_lines
day = 2 # 1, 2

# Construct the SQL INSERT statement
sql_insert_items = """INSERT INTO items (item_id, item_description, item_status) VALUES (%s, LEFT(%s,50), LEFT(%s,1)) 
    ON CONFLICT (item_id) DO UPDATE 
    SET item_description = EXCLUDED.item_description, item_status = EXCLUDED.item_status;"""

sql_insert_customers = """INSERT INTO customers (customer_id, customer_number, customer_name, address, 
    postal_code, city, country, country_code, telephone) VALUES (%s, LEFT(%s,50), LEFT(%s,50), LEFT(%s,100), LEFT(%s,10), LEFT(%s,50), LEFT(%s,50), LEFT(%s,10), LEFT(%s,50)) 
    ON CONFLICT (customer_id) DO UPDATE 
    SET customer_number = EXCLUDED.customer_number, customer_name = EXCLUDED.customer_name, address = EXCLUDED.address,
    postal_code = EXCLUDED.postal_code, city = EXCLUDED.city, country = EXCLUDED.country,
    country_code = EXCLUDED.country_code, telephone = EXCLUDED.telephone;"""

sql_insert_order_headers = """INSERT INTO order_headers (order_id, order_number, order_date, customer_id, order_status, currency) VALUES (%s, LEFT(%s,20), %s, %s, LEFT(%s,10), LEFT(%s,10)) 
    ON CONFLICT (order_id) DO UPDATE 
    SET order_number = EXCLUDED.order_number, order_date = EXCLUDED.order_date, customer_id = EXCLUDED.customer_id,
    order_status = EXCLUDED.order_status, currency = EXCLUDED.currency;"""

sql_insert_order_lines = """INSERT INTO order_lines (orderline_id, order_id, item_id, ship_date, promise_date, ordered_quantity) VALUES (%s, %s, %s, %s, %s, %s) 
    ON CONFLICT (orderline_id) DO UPDATE 
    SET order_id = EXCLUDED.order_id, item_id = EXCLUDED.item_id, ship_date = EXCLUDED.ship_date,
    promise_date = EXCLUDED.promise_date, ordered_quantity = EXCLUDED.ordered_quantity;"""

sql_insert_audits = f"""INSERT INTO audits (start_time, end_time, numberrow_treatment, status) 
VALUES (%s, %s, %s, %s);"""

# Function
def data_table(table, day):
    df = pd.read_csv(f'day{day}_files/{table}.csv')
    if table == 'customers':
        return df.dropna(subset=['customer_number', 'customer_name', 'address', 
    'postal_code', 'city', 'country']).values.tolist()
    else:
        return eval(f"df.values.tolist()")
    
def sql_insert_table(table):
    return eval(f"sql_insert_{table}")


# Sample data for bulk insertion
data = data_table(table, day)
sql_insert = sql_insert_table(table)



# Function
def insert_date(sql_insert, data, password):

    # Replace these values with your PostgreSQL connection details 
    dbname = 'postgres'
    user = 'postgres'
    password = password
    host = 'localhost'
    port = 5432
    status = 'False'
    nunberrow_treatment = 0

    # Create a connection to the PostgreSQL database
    conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)

    # Create a cursor object to execute SQL queries
    cursor = conn.cursor()

    start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    try:
        # Use executemany to insert the data in bulk
        cursor.executemany(sql_insert, data)

        # Commit the changes to the database
        conn.commit()

        print("Bulk insertion successful!")
        status = 'Success'
        nunberrow_treatment = len(data)
    except Exception as e:
        # If an error occurs, rollback the changes
        conn.rollback()
        print(f"Error: {e}")

    finally:
        end_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        
        # Audit Log ingestion
        cursor.executemany(sql_insert_audits, [[start_time, end_time, nunberrow_treatment, status]])
        conn.commit()

        # Close the cursor and connection
        cursor.close()
        conn.close()
    

insert_date(sql_insert, data, password)

Bulk insertion successful!


In [268]:
# Create VIEW: Star Schema: SalesHistory
conn = psycopg2.connect(host="localhost", dbname="postgres", user="postgres", password="31415926", port=5432)

cur = conn.cursor()

cur.execute("""
    CREATE VIEW SalesHistory AS
    SELECT 
        order_lines.orderline_id,
        order_headers.order_id,
        items.item_id,
        customers.customer_id,
        order_lines.ship_date,
        order_lines.promise_date,
        order_lines.ordered_quantity
    FROM order_lines
    JOIN items ON items.item_id = order_lines.item_id
    JOIN order_headers ON order_headers.order_id = order_lines.order_id
    JOIN customers ON customers.customer_id = order_headers.customer_id
""")

conn.commit()

cur.close()
conn.close()

In [280]:
# SELECT * FROM SalesHistory
conn = psycopg2.connect(host="localhost", dbname="postgres", user="postgres", password="31415926", port=5432)

cur = conn.cursor()

cur.execute("""
    SELECT 
	*
    FROM saleshistory
""")

# Fetch all rows from the query
data_mart = cur.fetchall()

cur.close()
conn.close()

# Create Parquet File
data_mart = pd.DataFrame(data_mart, columns=['orderline_id', 'order_id', 'item_id', 'customer_id', 'ship_date', 'promise_date', 'ordered_quantity'])
data_mart.to_parquet('SalesHistory.parquet', index=False)