In [7]:
# etl_script.py

import pandas as pd
from sqlalchemy import create_engine
import sys

# --- 1. EXTRACT ---
print("Step 1: Extracting data...")

try:
    # Read both sheets
    df1 = pd.read_excel("online_retail_II.xlsx", sheet_name='Year 2009-2010')
    df2 = pd.read_excel("online_retail_II.xlsx", sheet_name='Year 2010-2011')
    
    # Combine them into a single DataFrame
    df = pd.concat([df1, df2], ignore_index=True)
    
    # Clean up column names by stripping whitespace
    df.columns = df.columns.str.strip()
    
    print("   ✅ Data from both sheets extracted and combined successfully.")
    print(f"   Total rows in combined dataset: {df.shape[0]}")

except FileNotFoundError:
    print("   ❌ ERROR: The file 'online_retail_II.xlsx' was not found...")
    sys.exit()

# --- 2. TRANSFORM ---
print("\nStep 2: Transforming data...")

# Handle missing values
df.dropna(subset=['Customer ID'], inplace=True)
print("   - Dropped rows with missing 'Customer ID'.")

# Clean up data types
df['Customer ID'] = df['Customer ID'].astype(int).astype(str)
df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])
print("   - Converted 'Customer ID' and 'InvoiceDate' to correct data types.")

# Remove cancelled orders (where Invoice starts with 'C')
initial_rows = df.shape[0]
# ✅ CHANGED HERE: Using 'Invoice' instead of 'InvoiceNo'
df = df[~df['Invoice'].astype(str).str.startswith('C')]
rows_removed = initial_rows - df.shape[0]
print(f"   - Removed {rows_removed} cancelled orders.")

# Remove rows with negative or zero quantity
initial_rows = df.shape[0]
df = df[df['Quantity'] > 0]
rows_removed = initial_rows - df.shape[0]
print(f"   - Removed {rows_removed} rows with non-positive quantities.")

# Create a TotalPrice column
df['TotalPrice'] = df['Quantity'] * df['Price']
print("   - Created 'TotalPrice' column.")

# Select and rename columns for clarity in the database
# ✅ CHANGED HERE: Using 'Invoice' instead of 'InvoiceNo'
df_cleaned = df[['Invoice', 'StockCode', 'Description', 'Quantity', 'InvoiceDate', 'Price', 'Customer ID', 'Country', 'TotalPrice']].copy()
df_cleaned.rename(columns={
    'Invoice': 'invoice_id', # ✅ CHANGED HERE
    'StockCode': 'stock_code',
    'Description': 'description',
    'Quantity': 'quantity',
    'InvoiceDate': 'invoice_date',
    'Price': 'unit_price',
    'Customer ID': 'customer_id',
    'Country': 'country',
    'TotalPrice': 'total_price'
}, inplace=True)
print("   - Selected and renamed columns for the database.")

print(f"   ✅ Data transformation complete. Final dataset has {df_cleaned.shape[0]} rows.")

Step 1: Extracting data...
   ✅ Data from both sheets extracted and combined successfully.
   Total rows in combined dataset: 1067371

Step 2: Transforming data...
   - Dropped rows with missing 'Customer ID'.
   - Converted 'Customer ID' and 'InvoiceDate' to correct data types.
   - Removed 18744 cancelled orders.
   - Removed 0 rows with non-positive quantities.
   - Created 'TotalPrice' column.
   - Selected and renamed columns for the database.
   ✅ Data transformation complete. Final dataset has 805620 rows.


In [11]:
# --- 3. LOAD ---
print("\nStep 3: Loading data into PostgreSQL...")

# 1. DEFINE YOUR CONNECTION DETAILS
# Replace 'YOUR_PASSWORD' with the password you set during PostgreSQL installation.
db_user = 'postgres'
db_password = '1234' # ⚠️ <--- PUT YOUR PASSWORD HERE
db_host = 'localhost'
db_port = '5432'
db_name = 'retail_sales'
table_name = 'transactions'

# 2. CREATE THE CONNECTION ENGINE
try:
    # This f-string combines your details into the required connection URL
    connection_url = f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}'
    engine = create_engine(connection_url)
    
    # Test the connection (optional but recommended)
    connection = engine.connect()
    print("   ✅ Connection to PostgreSQL database successful.")
    connection.close()

except Exception as e:
    print(f"   ❌ An error occurred while connecting to the database: {e}")
    # Exit the script if connection fails
    sys.exit()

# 3. LOAD THE DATAFRAME INTO THE DATABASE TABLE
try:
    # 'if_exists='replace'' will drop the table if it already exists and create a new one.
    df_cleaned.to_sql(table_name, engine, if_exists='replace', index=False)
    
    print(f"   ✅ Data successfully loade
d into the '{table_name}' table.")

except Exception as e:
    print(f"   ❌ An error occurred while loading data: {e}")

print("\nETL process finished.")


Step 3: Loading data into PostgreSQL...
   ✅ Connection to PostgreSQL database successful.
   ✅ Data successfully loaded into the 'transactions' table.

ETL process finished.
