# 01 ‚Äì Ingest 5-Minute NEM Demand Data (NEMOSIS ‚Üí PostgreSQL)

## Purpose

This notebook builds the **raw operational data layer** for the project.

We extract 5-minute regional demand data from the AEMO public archive using the **NEMOSIS** Python library, then store a clean version of this data in **PostgreSQL** for downstream analytics and dashboards.

## How it fits into the overall methodology

This is **Layer 1 ‚Äì Data Ingestion** in the project pipeline:

1. **Raw 5-min data** (this notebook)  
2. **Daily & monthly usage analytics** (later notebooks)  
3. **Operational insights** ‚Äì peak events, low-demand events, weekday profiles  
4. **Region-level KPIs & Power BI dashboards**

All later analysis notebooks assume this base table exists in Postgres:

- `dispatch_region_5min`  
  - `settlement_ts` ‚Äì 5-min timestamp  
  - `region_id` ‚Äì NEM region (NSW1, QLD1, etc.)  
  - `total_demand` ‚Äì operational demand in MW

---


## Step 1 ‚Äì Load Environment Variables and Connect to PostgreSQL

### Why we are doing this

Before we fetch any data, we need a **reliable connection** to our database so that:

- All ingested data is stored in a central, queryable location.
- Analytics notebooks and BI tools (e.g. Power BI) can all point to the same source.
- Credentials are managed securely via a `.env` file instead of hard-coding them.

This step ensures the notebook can talk to PostgreSQL using the correct host, port, user, and database name.

---


In [1]:
 import os
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
from pathlib import Path

# Load .env
env_path = Path().resolve() / ".env"
load_dotenv(env_path, override=True)

DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")

# Create engine
engine = create_engine(
    f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)

# Test connection
with engine.connect() as conn:
    print("Connected OK:", conn.execute(text("SELECT 1;")).fetchall())


Connected OK: [(1,)]


## Step 2 ‚Äì Define NEMOSIS Table and Date Range

### Why we are doing this

The AEMO operational data is split into many tables and time slices.
For this project we care about **5-minute regional demand**, which lives in the NEMOSIS table:

- `DISPATCHREGIONSUM`

We also want a **fixed analysis window** (e.g. Jan 2025 to Oct 2025), so that:

- All downstream analytics use a consistent timeframe.
- The project is reproducible: same code ‚Üí same time window ‚Üí same results.

This step simply sets up the configuration for which **table**, **regions**, and **date range** we are ingesting.

---

## Step 3 ‚Äì Download 5-Minute Regional Demand with NEMOSIS (Month-by-Month)




The NEM archive is large, and downloading everything in one shot is:

- Slow
- Memory-heavy
- Hard to debug

Instead, we follow a **monthly ingestion pattern**:

1. Loop month-by-month between the start and end dates.
2. For each month, use NEMOSIS `dynamic_data_compiler()` to fetch `DISPATCHREGIONSUM`.
3. Keep only the fields we need for this project:
   - `SETTLEMENTDATE` (5-minute timestamp)
   - `REGIONID` (NEM region)
   - `TOTALDEMAND` (MW)
4. Append each month‚Äôs data into a single DataFrame.

This pattern is closer to what a **production ETL pipeline** would do: chunked, repeatable ingestion instead of a single massive pull.

---

In [2]:
import os
from datetime import datetime, timedelta

import pandas as pd
from nemosis import dynamic_data_compiler

# ---------- SETTINGS (EDIT DATES IF YOU WANT) ----------
table_name = "DISPATCHREGIONSUM"
raw_data_cache_dir = "data_raw"

# test for 1‚Äì2 months only to keep it light
start_date = datetime(2025, 1, 1, 0, 0, 0)
end_date   = datetime(2025, 3, 1, 0, 0, 0)   # not included (so Jan + Feb)

os.makedirs(raw_data_cache_dir, exist_ok=True)

current = start_date

print("\n--- NEMOSIS DATA CHECK (ALL REGIONS, NO DB) ---")

while current < end_date:
    # month boundaries
    next_month = (current.replace(day=28) + timedelta(days=4)).replace(day=1)
    end_of_month = next_month - timedelta(seconds=1)

    start_str = current.strftime("%Y/%m/%d %H:%M:%S")
    end_str   = end_of_month.strftime("%Y/%m/%d %H:%M:%S")

    print(f"\nüìÖ Fetching {start_str} ‚Üí {end_str} for {table_name} ...")

    try:
        df = dynamic_data_compiler(
            start_str,
            end_str,
            table_name,
            raw_data_cache_dir,   # raw_data_cache
        )

        print("   ‚úÖ Rows fetched:", len(df))

        if len(df) > 0:
            # show region IDs available
            if "REGIONID" in df.columns:
                print("   üåè Regions in this chunk:", df["REGIONID"].unique())
            else:
                print("   ‚ö†Ô∏è 'REGIONID' column not found. Columns are:")
                print("      ", df.columns.tolist())

            # show a tiny sample
            print("   üìä Sample rows:")
            print(df.head(3))

        del df

    except Exception as e:
        print(f"   ‚ùå Error for {current.strftime('%B %Y')}: {e}")

    current = next_month

print("\n--- NEMOSIS CHECK COMPLETE ---")



--- NEMOSIS DATA CHECK (ALL REGIONS, NO DB) ---

üìÖ Fetching 2025/01/01 00:00:00 ‚Üí 2025/01/31 23:59:59 for DISPATCHREGIONSUM ...
INFO: Compiling data for table DISPATCHREGIONSUM
INFO: Returning DISPATCHREGIONSUM.
   ‚úÖ Rows fetched: 44635
   üåè Regions in this chunk: ['NSW1' 'QLD1' 'SA1' 'TAS1' 'VIC1']
   üìä Sample rows:
       SETTLEMENTDATE REGIONID  DISPATCHINTERVAL  INTERVENTION  TOTALDEMAND  \
0 2025-01-01 00:05:00     NSW1       20241231241             0      7251.07   
1 2025-01-01 00:05:00     QLD1       20241231241             0      6444.99   
2 2025-01-01 00:05:00      SA1       20241231241             0      1379.47   

   AVAILABLEGENERATION  AVAILABLELOAD  DEMANDFORECAST  DISPATCHABLEGENERATION  \
0          12034.75116            220             -28                 7612.94   
1           9145.64397            420             -42                 5965.77   
2           2473.89912            412              12                 1000.90   

   DISPATCHABLELOAD  ... 

## Step 4 ‚Äì Build `dispatch_region_5min` Table in PostgreSQL

### Why we are doing this

At this point we have a list of monthly DataFrames in memory.  
To make the data useful for analytics and dashboards, we:

1. **Concatenate** all monthly chunks into a single DataFrame.
2. **Optionally drop an old table** so we don‚Äôt duplicate rows during testing.
3. **Create a clean base table** in PostgreSQL:
   - One row per 5-minute interval per region.
   - Columns: timestamp, region, total demand.
4. Validate with a quick preview and row count.

This `dispatch_region_5min` table is the **foundation for all later notebooks** (daily, monthly, peaks, low demand, region comparison).

---


In [3]:
import os
from datetime import datetime, timedelta

import pandas as pd
from sqlalchemy import create_engine
from nemosis import dynamic_data_compiler

# ---------- 1. POSTGRES SETTINGS ----------
DB_USER = "vivekarya"          # change if your username is different
DB_PASSWORD = "Ap28bf9456" #
DB_HOST = "localhost"
DB_PORT = 5432
DB_NAME = "postgres"  

def get_engine():
    url = f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
    return create_engine(url)

# ---------- 2. NEMOSIS SETTINGS ----------
table_name = "DISPATCHREGIONSUM"
raw_data_cache_dir = "data_raw"
os.makedirs(raw_data_cache_dir, exist_ok=True)

# Use a small test range first (you can change later)
start_date = datetime(2025, 1, 1, 0, 0, 0)
end_date = datetime(2025, 11, 10, 0, 0, 0)  # Jan + Feb 2025

def main():
    engine = get_engine()

    # Test DB connection
    with engine.connect() as conn:
        result_1=conn.execute(text("SELECT 1;")).fetchall()
        print("‚úÖ DB connection OK:", result_1)

    # (Optional) Clear old table so you don't duplicate rows during testing
    with engine.connect() as conn:
        conn.execute(text("DROP TABLE IF EXISTS dispatch_region_5min;"))
     ##execute(text("SELECT 1;")).fetchall()
        print("üßπ Dropped old table dispatch_region_5min (if existed).")

    current = start_date

    print("\n--- Starting NEMOSIS ‚Üí Postgres Ingestion (ALL REGIONS) ---")

    total_inserted = 0

    while current < end_date:
        # Month boundaries
        next_month = (current.replace(day=28) + timedelta(days=4)).replace(day=1)
        end_of_month = next_month - timedelta(seconds=1)

        start_str = current.strftime("%Y/%m/%d %H:%M:%S")
        end_str   = end_of_month.strftime("%Y/%m/%d %H:%M:%S")

        print(f"\nüìÖ Fetching {start_str} ‚Üí {end_str} for {table_name} (ALL REGIONS)...")

        try:
            # 1Ô∏è‚É£ Fetch from NEMOSIS
            df = dynamic_data_compiler(
                start_str,
                end_str,
                table_name,
                raw_data_cache_dir,   # raw_data_cache
            )

            print(f"   ‚úÖ Rows fetched this month: {len(df)}")

            if len(df) == 0:
                print("   ‚ÑπÔ∏è No rows for this month, skipping.")
                current = next_month
                continue

            # 2Ô∏è‚É£ Make sure REGIONID exists
            if "REGIONID" not in df.columns:
                print("   ‚ö†Ô∏è 'REGIONID' column missing, available columns:")
                print("      ", df.columns.tolist())
                current = next_month
                continue

            # 3Ô∏è‚É£ Keep only columns we need
            df["SETTLEMENTDATE"] = pd.to_datetime(df["SETTLEMENTDATE"])
            df_clean = df[["SETTLEMENTDATE", "REGIONID", "TOTALDEMAND"]].copy()

            # 4Ô∏è‚É£ Rename columns for DB
            df_clean = df_clean.rename(columns={
                "SETTLEMENTDATE": "settlement_ts",
                "REGIONID": "region_id",
                "TOTALDEMAND": "total_demand",
                
            })

            # 5Ô∏è‚É£ Insert into Postgres
            df_clean.to_sql(
                "dispatch_region_5min",
                engine,
                if_exists="append",
                index=False,
            )

            print(f"   ‚úÖ Inserted {len(df_clean)} rows for month {current.strftime('%B %Y')}.")
            total_inserted += len(df_clean)

            # free memory
            del df
            del df_clean

        except Exception as e:
            print(f"   ‚ùå Error while processing {current.strftime('%B %Y')}: {e}")

        current = next_month

    print("\n--- Ingestion complete! ---")
    print(f"üì¶ Total rows inserted into dispatch_region_5min: {total_inserted}")

if __name__ == "__main__":
    main()


‚úÖ DB connection OK: [(1,)]
üßπ Dropped old table dispatch_region_5min (if existed).

--- Starting NEMOSIS ‚Üí Postgres Ingestion (ALL REGIONS) ---

üìÖ Fetching 2025/01/01 00:00:00 ‚Üí 2025/01/31 23:59:59 for DISPATCHREGIONSUM (ALL REGIONS)...
INFO: Compiling data for table DISPATCHREGIONSUM
INFO: Returning DISPATCHREGIONSUM.
   ‚úÖ Rows fetched this month: 44635
   ‚úÖ Inserted 44635 rows for month January 2025.

üìÖ Fetching 2025/02/01 00:00:00 ‚Üí 2025/02/28 23:59:59 for DISPATCHREGIONSUM (ALL REGIONS)...
INFO: Compiling data for table DISPATCHREGIONSUM
INFO: Returning DISPATCHREGIONSUM.
   ‚úÖ Rows fetched this month: 40315
   ‚úÖ Inserted 40315 rows for month February 2025.

üìÖ Fetching 2025/03/01 00:00:00 ‚Üí 2025/03/31 23:59:59 for DISPATCHREGIONSUM (ALL REGIONS)...
INFO: Compiling data for table DISPATCHREGIONSUM
INFO: Returning DISPATCHREGIONSUM.
   ‚úÖ Rows fetched this month: 44635
   ‚úÖ Inserted 44635 rows for month March 2025.

üìÖ Fetching 2025/04/01 00:00:00 ‚Üí

In [4]:
import pandas as pd

# Pull everything from Postgres into one dataframe
df_export = pd.read_sql("""
    SELECT *
    FROM dispatch_region_5min
    ORDER BY settlement_ts, region_id;
""", engine)

# Export to CSV
df_export.to_csv("dispatch_region_5min.csv", index=False)

print("‚úÖ CSV exported: dispatch_region_5min.csv")
print("Rows exported:", len(df_export))

‚úÖ CSV exported: dispatch_region_5min.csv
Rows exported: 1313130
