In [None]:
import sqlalchemy

import os
from dotenv import load_dotenv
load_dotenv()

In [None]:
# Connect to PostgreSQL - Staging
DATABASE_URL_STAGING = os.getenv("DATABASE_URL_STAGING")
engine = sqlalchemy.create_engine(DATABASE_URL_STAGING,  client_encoding='utf8')
connection = engine.connect()

In [None]:
# Check Database
asd = sqlalchemy.inspect(engine)
print(asd.get_table_names())

In [None]:
import pandas as pd

# Select Data
query = "SELECT * FROM sales_data LIMIT 5;"
result_df = pd.read_sql(query, con=connection)
result_df

In [None]:
result_df.isnull().sum()

In [None]:
result_df = result_df.dropna()
result_df.isnull().sum()

In [None]:
result_df.duplicated().sum()

In [None]:
result_df = result_df.drop_duplicates()
result_df.duplicated().sum()

In [None]:
result_df = result_df[result_df["Order ID"].str.isnumeric()]
result_df.shape

In [None]:
result_df.info()

In [None]:
# Remove Whitespaces
result_df["Order ID"] = result_df["Order ID"].str.strip()
result_df["Product"] = result_df["Product"].str.strip()
result_df["Quantity Ordered"] = result_df["Quantity Ordered"].str.strip()
result_df["Price Each"] = result_df["Price Each"].str.strip()
result_df["Order Date"] = result_df["Order Date"].str.strip()
result_df["Purchase Address"] = result_df["Purchase Address"].str.strip()

# Fix Data Types
result_df["Order ID"] = result_df["Order ID"].astype(int)
result_df["Quantity Ordered"] = result_df["Quantity Ordered"].astype(int)
result_df["Price Each"] = result_df["Price Each"].astype(float)
result_df["Order Date"] = pd.to_datetime(result_df["Order Date"])

In [None]:
usa_result_df = result_df[result_df["FROM"].str.contains("USA")]
canada_result_df = result_df[result_df["FROM"].str.contains("Canada")]

In [None]:
canada_result_df["Price Each"] = canada_result_df["Price Each"] * 0.72

In [None]:
final_df = pd.concat([usa_result_df, canada_result_df], ignore_index=True)
final_df = final_df.rename(columns={"FROM": "Country"})
final_df

In [None]:
# Close the connection
connection.close()

In [None]:
# Connect to PostgreSQL - Final
DATABASE_URL_FINAL = os.getenv("DATABASE_URL_FINAL") 
engine = sqlalchemy.create_engine(DATABASE_URL_FINAL,  client_encoding='utf8')
connection = engine.connect()

In [None]:
# Check Database
asd = sqlalchemy.inspect(engine)
print(asd.get_table_names())

In [None]:
# Save DataFrame to Database
final_df.to_sql("sales_data_duckdb", con=connection, if_exists="replace", index=False)

In [None]:
# Verify the data was saved correctly
query = "SELECT * FROM sales_data_duckdb LIMIT 5;"
check_df = pd.read_sql(query, con=connection)
check_df

In [None]:
# Verify the data was saved correctly
query_ext = "SELECT * FROM pg_extension;"
pd.read_sql(query, con=connection)