In [None]:
import pandas as pd
import sqlite3
from sqlite3 import Error
import requests  # For fetching API data
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
import numpy as np
import warnings
import os # To check for file

# Suppress warnings
warnings.filterwarnings('ignore', category=UserWarning, module='sklearn')
warnings.filterwarnings('ignore', category=FutureWarning)

print("--- Libraries Imported Successfully ---")

In [None]:
import pandas as pd
from sqlalchemy import create_engine

# Aiven connection info
username = "avnadmin"
password = "AVNS_2qfKxYCuW4yvo49vzm8"
host = "pg-29b815f1-khuzaimahassan52-cd49.i.aivencloud.com"
port = 27193
database = "defaultdb"

engine = create_engine(
    f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}?sslmode=require"
)

# Extract tables
df_customers = pd.read_sql("SELECT * FROM customers;", engine)
df_orders = pd.read_sql("SELECT * FROM orders;", engine)
df_products = pd.read_sql("SELECT * FROM products;", engine)
df_order_details=pd.read_sql("SELECT * FROM order_details;",engine)

print("âœ… Data extracted successfully")



In [None]:
print(df_customers.shape)
print(df_orders.shape)
print(df_products.shape)
print(df_orders.columns)
print(df_customers.columns)
print(df_products.columns)
print(df_order_details.columns)
print(df_order_details.shape)

In [None]:


csv_file_name = '/kaggle/input/e-commerce-sales-transactions-dataset/ecommerce_sales_34500.csv'
if not os.path.exists(csv_file_name):
    print(f"[ETL Extract] ERROR: File '{csv_file_name}' not found.")
    df_sales_source = pd.DataFrame()
else:
    try:
        df_sales_source = pd.read_csv(csv_file_name)
        print(f"[ETL Extract] Successfully read '{csv_file_name}'.")
        print(f"  - Sales source data has {len(df_sales_source)} rows.")
    except Exception as e:
        print(f"An error occurred: {e}")
        df_sales_source = pd.DataFrame()



api_url = "https://dummyjson.com/products?limit=100"
print(f"\n[ETL Extract] Fetching data from API: {api_url}...")
try:
    response = requests.get(api_url)
    response.raise_for_status()
    api_data = response.json()
    product_list = api_data['products']
    df_api_source = pd.DataFrame(product_list)
    print(f"  - API data fetched successfully. Received {len(df_api_source)} products.")
except requests.exceptions.RequestException as e:
    print(f"Error fetching API data: {e}")
    df_api_source = pd.DataFrame()

In [None]:
print(df_api_source.columns)
print(df_sales_source.columns)

In [None]:
# --- CSV ---
df_sales_source.rename(columns={
    'order_id':'order_id',
    'customer_id':'customer_id',
    'product_id':'product_id',
    'category':'category_csv',
    'price':'price_csv',
    'discount':'discount_csv',
    'quantity':'quantity',
    'payment_method':'payment_method',
    'order_date':'order_date',
    'delivery_time_days':'delivery_time_days',
    'region':'region_csv',
    'returned':'returned',
    'total_amount':'total_amount',
    'shipping_cost':'shipping_cost',
    'profit_margin':'profit_margin',
    'customer_age':'customer_age',
    'customer_gender':'customer_gender'
}, inplace=True)

# --- API ---
df_api_source.rename(columns={
    'id':'product_id',
    'title':'product_name_api',
    'price':'price_api',
    'category':'category_api',
    'stock':'stock',
    'brand':'brand',
    'rating':'rating'
}, inplace=True)

# --- Northwind products (already loaded as df_products) ---
df_products.rename(columns={
    'product_id':'product_id',
    'product_name':'product_name_nw',
    'unit_price':'price_nw'
}, inplace=True)


# Merge Northwind Orders + Customers + Products + order_details

In [None]:
# Merge orders with customers
df_orders_customers = df_orders.merge(df_customers, on='customer_id', how='left')

# Assuming df_order_details is loaded from Northwind
df_order_details.columns  # ['order_id', 'product_id', 'unit_price', 'quantity', 'discount']

# Merge with orders+customers
df_orders_customers = df_orders_customers.merge(
    df_order_details[['order_id','product_id','quantity','unit_price']],
    on='order_id',
    how='left'
)


In [None]:
# Merge with Northwind products
df_orders_full = df_orders_customers.merge(df_products, on='product_id', how='left')

# Inspect
df_orders_full.head()

In [None]:
df_orders_full.columns

In [None]:
print(df_sales_source.columns)
print(df_api_source.columns)

In [None]:
# Convert to string for safe merging
df_orders_full['order_id'] = df_orders_full['order_id'].astype(str)
df_orders_full['product_id'] = df_orders_full['product_id'].astype(str)

df_sales_source['order_id'] = df_sales_source['order_id'].astype(str)
df_sales_source['product_id'] = df_sales_source['product_id'].astype(str)


In [None]:
df_orders_full = df_orders_full.merge(
    df_sales_source[['order_id', 'product_id', 'price_csv', 'discount_csv', 'quantity', 'total_amount', 'region_csv']],
    on=['order_id', 'product_id'],
    how='left'
)


In [None]:
df_api_source['product_id'] = df_api_source['product_id'].astype(str)


In [None]:
df_orders_full = df_orders_full.merge(
    df_api_source[['product_id','product_name_api','price_api','category_api','stock','brand','rating']],
    on='product_id',
    how='left'
)


In [None]:
print(df_orders_full.head())
print(df_orders_full.columns)
print(df_orders_full.shape)

In [None]:
dim_customers = df_orders_full[[
    'customer_id', 'company_name', 'contact_name', 'contact_title',
    'address', 'city', 'region', 'postal_code', 'country', 'phone', 'fax'
]].drop_duplicates().reset_index(drop=True)


In [None]:
dim_products = df_orders_full[[
    'product_id', 'product_name_nw', 'category_api', 'brand',
    'unit_price', 'stock'
]].drop_duplicates().reset_index(drop=True)

# Rename columns for clarity
dim_products = dim_products.rename(columns={
    'product_name_nw': 'product_name',
    'category_api': 'category',
    'unit_price': 'price',
    'stock': 'units_in_stock'
})


In [None]:
df_orders_full['order_date'] = pd.to_datetime(df_orders_full['order_date'])


In [None]:
dim_dates = df_orders_full[['order_date']].drop_duplicates().reset_index(drop=True)
dim_dates['date_key'] = dim_dates['order_date'].dt.strftime('%Y%m%d').astype(int)
dim_dates['day'] = dim_dates['order_date'].dt.day
dim_dates['month'] = dim_dates['order_date'].dt.month
dim_dates['quarter'] = dim_dates['order_date'].dt.quarter
dim_dates['year'] = dim_dates['order_date'].dt.year


In [None]:
dim_regions = df_orders_full[['region_csv', 'country']].drop_duplicates().reset_index(drop=True)
dim_regions = dim_regions.rename(columns={'region_csv': 'region_name'})
dim_regions['region_key'] = range(1, len(dim_regions)+1)


In [None]:
# Compute final price (if not already done)
df_orders_full['final_price'] = df_orders_full['price_csv'] * (1 - df_orders_full['discount_csv'])

# Compute order revenue
df_orders_full['order_revenue'] = df_orders_full['final_price'] * df_orders_full['quantity_y']

fact_sales = df_orders_full[[
    'order_id', 'customer_id', 'product_id', 'quantity_y', 'final_price', 
    'order_revenue', 'order_date', 'region_csv'
]].rename(columns={
    'quantity_y': 'quantity',
    'region_csv': 'region'
})


In [None]:
# Customers
dim_customers.to_sql('dim_customers', engine, if_exists='replace', index=False)
print("dim_customers loaded successfully")

# Products
dim_products.to_sql('dim_products', engine, if_exists='replace', index=False)
print("dim_products loaded successfully")

# Dates
dim_dates.to_sql('dim_dates', engine, if_exists='replace', index=False)
print("dim_dates loaded successfully")

# Regions
dim_regions.to_sql('dim_regions', engine, if_exists='replace', index=False)
print("dim_regions loaded successfully")

fact_sales.to_sql('fact_sales', engine, if_exists='replace', index=False)
print("fact_sales loaded successfully")


In [None]:
import pandas as pd

# Example: read first 5 rows of fact_sales
df_check = pd.read_sql("SELECT * FROM fact_sales LIMIT 5;", engine)
print(df_check)


# Transformations

In [None]:
# Rename quantity columns for clarity
df_orders_full.rename(columns={
    'quantity_x':'quantity_nw',   # quantity from Northwind order_details
    'quantity_y':'quantity_csv'
}, inplace=True)

# Choose a single quantity column for calculations
# If CSV quantity exists, use it; otherwise, fallback to Northwind
df_orders_full['quantity'] = df_orders_full['quantity_csv'].combine_first(df_orders_full['quantity_nw'])


# Final Price 

In [None]:
df_orders_full['final_price'] = df_orders_full.apply(
    lambda row: row['price_csv'] * (1 - row['discount_csv']/100) 
    if pd.notnull(row['price_csv']) and pd.notnull(row['discount_csv'])
    else row['price_api'] if pd.notnull(row['price_api'])
    else row['price_nw'],
    axis=1
)


# Total Revenue per Order Line

In [None]:
df_orders_full['order_revenue'] = df_orders_full['final_price'] * df_orders_full['quantity']
print(df_orders_full['order_revenue'])

# Consolidate Category Column

In [None]:
df_orders_full['category'] = df_orders_full['category_api'].combine_first(df_orders_full['category_api'])


# Select Analytics-Ready Columns

In [None]:
df_analytics = df_orders_full[[
    'order_id', 'order_date', 'customer_id', 'company_name', 'contact_name',
    'product_id', 'product_name_nw', 'product_name_api', 'category',
    'quantity', 'final_price', 'order_revenue', 'region_csv', 'brand', 'stock', 'rating'
]]


# Summary Tables

In [None]:
# Revenue by product
df_sales_by_product = df_analytics.groupby('product_id')['order_revenue'].sum().reset_index()


df_sales_by_product['product_id'] = df_sales_by_product['product_id'].astype(str)
df_products['product_id'] = df_products['product_id'].astype(str)


In [None]:
df_sales_by_product = df_sales_by_product.merge(
    df_products[['product_id','product_name_nw']], 
    on='product_id', how='left'
)

In [None]:
# Revenue by Customer
df_sales_by_customer = df_analytics.groupby('customer_id')['order_revenue'].sum().reset_index()
df_sales_by_customer = df_sales_by_customer.merge(
    df_customers[['customer_id','company_name']], 
    on='customer_id', how='left'
)


In [None]:
# Revenue by Region
df_sales_by_region = df_analytics.groupby('region_csv')['order_revenue'].sum().reset_index()


In [None]:
df_analytics.to_csv('northwind_merged_analytics.csv', index=False)
df_sales_by_product.to_csv('sales_by_product.csv', index=False)
df_sales_by_customer.to_csv('sales_by_customer.csv', index=False)
df_sales_by_region.to_csv('sales_by_region.csv', index=False)


In [None]:
from sqlalchemy import create_engine

engine = create_engine(
    f'postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}?sslmode=require'
)

df_analytics.to_sql('sales_analytics', engine, if_exists='replace', index=False)
