In [3]:
import pandas as pd
from google.cloud import bigquery
from google.oauth2 import service_account
from google.api_core.exceptions import Conflict
from datetime import datetime
import numpy as np

# 1Ô∏è‚É£ Authentication and Setup
PROJECT_ID = "sharedproject2025"
DATASET_ID = "Gold_Layer"
CSV_PATH = r"E:\Supply Chain Data Integration System\Refined and Cleansed_Supply_Chain_Data.csv"
key_path = r"E:\sharedproject2025-22cd4ce0c74f.json"
credentials = service_account.Credentials.from_service_account_file(key_path)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

# 2Ô∏è‚É£ Helper Function for BigQuery Loading
def create_and_load_table(client, df, table_id, dataset_id):
    """
    Creates a new table (if it doesn't exist) and loads a DataFrame into it,
    truncating any existing data.
    """
    table_ref_str = f"{client.project}.{dataset_id}.{table_id}"
    table_ref = bigquery.Table(table_ref_str)

    try:
        client.create_table(table_ref)
        print(f"‚úÖ Table '{table_id}' created.")
    except Conflict:
        print(f"‚ÑπÔ∏è Table '{table_id}' already exists.")

    print(f"üöÄ Loading {len(df)} rows into {table_ref_str}...")
    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        write_disposition="WRITE_TRUNCATE"
    )
    job = client.load_table_from_dataframe(df, table_ref_str, job_config=job_config)
    job.result()  # Wait for the job to complete
    print(f"‚úÖ Successfully loaded data into {table_ref_str}.")
    print("---")


# 3Ô∏è‚É£ Data Preprocessing
def load_and_preprocess(path):
    print(f"Reading data from {path}...")
    df = pd.read_csv(path)
    
    # Convert to datetime (auto-detect format or assume day first)
    df['Order Date'] = pd.to_datetime(df['Order Date'], dayfirst=True, errors='coerce')
    df['Ship Date'] = pd.to_datetime(df['Ship Date'], dayfirst=True, errors='coerce')

    # Time-based columns for trend analysis
    df['order_year'] = df['Order Date'].dt.year
    df['order_month'] = df['Order Date'].dt.to_period('M').dt.to_timestamp()
    
    # Handle potential div/0 errors
    df['Sales'] = df['Sales'].replace({0: np.nan})
    df['Quantity'] = df['Quantity'].replace({0: np.nan})

    print("‚úÖ Data read and preprocessed.")
    return df

# --- Read and Preprocess Data ---
df = load_and_preprocess(CSV_PATH)

# --- Create Dataset (if not exists) ---
dataset_ref = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}")
try:
    client.create_dataset(dataset_ref)
    print(f"‚úÖ Dataset '{DATASET_ID}' created.")
except Conflict:
    print(f"‚ÑπÔ∏è Dataset '{DATASET_ID}' already exists.")


# 4Ô∏è‚É£ AGGREGATION 1: Yearly & Total Summary (Your Original Table)
# -----------------------------------------------------------------
print("üìä Grouping data by year and calculating aggregates...")
yearly_agg_df = df.groupby('order_year').agg(
    Total_Sales=('Sales', 'sum'),
    Total_Profit=('Profit', 'sum'),
    Total_Units_Sold=('Quantity', 'sum'),
    Total_Shipping_Cost=('Shipping_Cost', 'sum'),
    Average_Discount=('Discount', 'mean'),
    Average_Profit=('Profit', 'mean'),
    Median_Profit=('Profit', 'median'),
    Max_Profit=('Profit', 'max'),
    Loss=('Profit', 'min'),
    Average_Shipping_time_days=('Shipping_Duration_Days', 'mean')
)

total_agg = {
    'Total_Sales': df['Sales'].sum(),
    'Total_Profit': df['Profit'].sum(),
    'Total_Units_Sold': df['Quantity'].sum(),
    'Total_Shipping_Cost': df['Shipping_Cost'].sum(),
    'Average_Discount': df['Discount'].mean(),
    'Average_Profit': df['Profit'].mean(),
    'Median_Profit': df['Profit'].median(),
    'Max_Profit': df['Profit'].max(),
    'Loss': df['Profit'].min(),
    'Average_Shipping_time_days': df['Shipping_Duration_Days'].mean()
}
total_df = pd.DataFrame([total_agg], index=['All-Years Total'])

combined_df = pd.concat([yearly_agg_df, total_df])
combined_df = combined_df.reset_index().rename(columns={'index': 'Period'})

combined_df['Profit_Margin'] = ((combined_df['Total_Profit'] / combined_df['Total_Sales'] * 100).fillna(0))
combined_df['Average_Sales_Per_Unit'] = ((combined_df['Total_Sales'] / combined_df['Total_Units_Sold']).fillna(0))
combined_df['Average_shipping_cost_per_unit'] = ((combined_df['Total_Shipping_Cost'] / combined_df['Total_Units_Sold']).fillna(0))
combined_df['Total_Units_Sold'] = combined_df['Total_Units_Sold'].fillna(0).astype(int)
combined_df = combined_df.round(2)
combined_df["load_timestamp"] = datetime.now()
combined_df['Period'] = combined_df['Period'].astype(str) # Convert Period to string for BQ

# --- Load Aggregation 1 ---
create_and_load_table(client, combined_df, "Agg_Yearly_Summary_Metrics", DATASET_ID)


# 5Ô∏è‚É£ AGGREGATION 2: Product Performance Summary
# -----------------------------------------------------------------
print("üìä Aggregating Product Performance...")
prod_agg_df = df.groupby(['Category', 'Sub-Category', 'Product Name']).agg(
    Total_Sales=('Sales', 'sum'),
    Total_Profit=('Profit', 'sum'),
    Total_Quantity=('Quantity', 'sum'),
    Average_Discount=('Discount', 'mean')
).reset_index()

prod_agg_df['Profit_Margin'] = ((prod_agg_df['Total_Profit'] / prod_agg_df['Total_Sales'] * 100).fillna(0))
prod_agg_df = prod_agg_df.round(2)
prod_agg_df["load_timestamp"] = datetime.now()

# --- Load Aggregation 2 ---
create_and_load_table(client, prod_agg_df, "Agg_Product_Performance", DATASET_ID)


# 6Ô∏è‚É£ AGGREGATION 3: Regional Segment Sales
# -----------------------------------------------------------------
print("üìä Aggregating Regional Segment Sales...")
region_seg_df = df.groupby(['Region', 'Segment']).agg(
    Total_Sales=('Sales', 'sum'),
    Total_Profit=('Profit', 'sum'),
    Total_Quantity=('Quantity', 'sum')
).reset_index()

region_seg_df['Profit_Margin'] = ((region_seg_df['Total_Profit'] / region_seg_df['Total_Sales'] * 100).fillna(0))
region_seg_df = region_seg_df.round(2)
region_seg_df["load_timestamp"] = datetime.now()

# --- Load Aggregation 3 ---
create_and_load_table(client, region_seg_df, "Agg_Regional_Segment_Sales", DATASET_ID)


# 7Ô∏è‚É£ AGGREGATION 4: Monthly Financial Trends
# -----------------------------------------------------------------
print("üìä Aggregating Monthly Financial Trends...")
monthly_df = df.groupby('order_month').agg(
    Total_Sales=('Sales', 'sum'),
    Total_Profit=('Profit', 'sum'),
    Total_Quantity=('Quantity', 'sum'),
    Total_Orders=('Order ID', 'nunique')
).reset_index()

monthly_df = monthly_df.round(2)
# Convert timestamp to string for BQ compatibility
monthly_df['order_month'] = monthly_df['order_month'].astype(str)
monthly_df["load_timestamp"] = datetime.now()

# --- Load Aggregation 4 ---
create_and_load_table(client, monthly_df, "Agg_Monthly_Financial_Trends", DATASET_ID)


print("\nüéâ All aggregation jobs done!")

Reading data from E:\Supply Chain Data Integration System\Refined and Cleansed_Supply_Chain_Data.csv...
‚úÖ Data read and preprocessed.
‚ÑπÔ∏è Dataset 'Gold_Layer' already exists.
üìä Grouping data by year and calculating aggregates...
‚úÖ Table 'Agg_Yearly_Summary_Metrics' created.
üöÄ Loading 5 rows into sharedproject2025.Gold_Layer.Agg_Yearly_Summary_Metrics...




‚úÖ Successfully loaded data into sharedproject2025.Gold_Layer.Agg_Yearly_Summary_Metrics.
---
üìä Aggregating Product Performance...
‚úÖ Table 'Agg_Product_Performance' created.
üöÄ Loading 3610 rows into sharedproject2025.Gold_Layer.Agg_Product_Performance...




‚úÖ Successfully loaded data into sharedproject2025.Gold_Layer.Agg_Product_Performance.
---
üìä Aggregating Regional Segment Sales...
‚úÖ Table 'Agg_Regional_Segment_Sales' created.
üöÄ Loading 39 rows into sharedproject2025.Gold_Layer.Agg_Regional_Segment_Sales...




‚úÖ Successfully loaded data into sharedproject2025.Gold_Layer.Agg_Regional_Segment_Sales.
---
üìä Aggregating Monthly Financial Trends...
‚úÖ Table 'Agg_Monthly_Financial_Trends' created.
üöÄ Loading 48 rows into sharedproject2025.Gold_Layer.Agg_Monthly_Financial_Trends...




‚úÖ Successfully loaded data into sharedproject2025.Gold_Layer.Agg_Monthly_Financial_Trends.
---

üéâ All aggregation jobs done!
