Connect with ERCOT Public API

Import Required Modules

In [None]:
# Standard library imports
import os
import time
import json
import zipfile
import shutil
import io
from datetime import datetime, timedelta

# Third-party imports
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import pandas as pd
import dask.dataframe as dd
from dotenv import load_dotenv


Token Pull Function

In [None]:
# Load environment variables from .env file
load_dotenv()

def get_ercot_token():
    # Get credentials from environment variables
    username = os.getenv("ERCOT_USERNAME")
    password = os.getenv("ERCOT_PASSWORD")
    subscription_key = os.getenv("ERCOT_SUBSCRIPTION_KEY")
    
    # Check if credentials are available
    if not all([username, password, subscription_key]):
        raise ValueError("Missing required environment variables. Please check your .env file.")
    
    # Authentication URL
    auth_url = "https://ercotb2c.b2clogin.com/ercotb2c.onmicrosoft.com/B2C_1_PUBAPI-ROPC-FLOW/oauth2/v2.0/token"
    
    # Request data as form data
    data = {
        "username": username,
        "password": password,
        "grant_type": "password",
        "scope": "openid fec253ea-0d06-4272-a5e6-b478baeecd70 offline_access",
        "client_id": "fec253ea-0d06-4272-a5e6-b478baeecd70",
        "response_type": "id_token"
    }
    
    # Make POST request with form data
    response = requests.post(auth_url, data=data)
    
    # Check if request was successful
    if response.status_code == 200:
        token_data = response.json()
        # Add subscription key to token data for convenience
        token_data['subscription_key'] = subscription_key
        return token_data
    else:
        raise Exception(f"Authentication failed with status code {response.status_code}: {response.text}")



Run Token Pull

In [None]:
try:
    token_data = get_ercot_token()
    print("Authentication successful!")
    print(f"Access token: {token_data['access_token'][:20]}...")
    print(f"ID token: {token_data['id_token'][:20]}...")
    print(f"Token expires in: {token_data['expires_in']} seconds")
except Exception as e:
    print(f"Error: {e}")

Request LMP Data Function

In [None]:
def get_product_history_bundles(token_data, page):
    """
    Retrieve product history from ERCOT API
    
    Args:
        token_data: The authentication token data
        
    Returns:
        Product history data
    """
    # Base URL for getProductHistory endpoint
    base_url = "https://api.ercot.com/api/public-reports/archive/np6-787-cd"
    
    # Add date range parameters
    params = {
        "postDatetimeFrom": "2024-01-01",
        "postDatetimeTo": "2024-12-31",
        "page": page
    }
    
    # Headers
    headers = {
        "Authorization": f"Bearer {token_data['access_token']}",
        "Ocp-Apim-Subscription-Key": token_data['subscription_key']
    }
    
    # Make the request with parameters
    response = requests.get(base_url, headers=headers, params=params, timeout=30)
    
    # Check if request was successful
    if response.status_code == 200:
        data = response.json()
        
        # Return the results
        if isinstance(data, list):
            return data
        else:
            return [data]
    else:
        raise Exception(f"Request failed with status code {response.status_code}: {response.text}")

Call LMP Download Function

In [None]:
# Step 1: Make sure you have a valid token
try:
    # token_data = get_ercot_token()
    print("Authentication successful!")
    print(f"Access token: {token_data['access_token'][:20]}...")

    # Step 2: Initialize an empty list to store all bundles
    all_bundles = []

    # Step 3: Get the first page to determine total pages
    first_page = get_product_history_bundles(token_data, page=1)
    if first_page and len(first_page) > 0:
        total_pages = first_page[0]["_meta"]["totalPages"]
        current_page = first_page[0]["_meta"]["currentPage"]

        # Add the first page data to our collection
        all_bundles.extend(first_page)
        print(f"Retrieved page {current_page} of {total_pages}")

        # Step 4: Iterate through remaining pages with improved rate limiting
        import time
        request_count = 1  # Already made one request for the first page
        request_start_time = time.time()

        while current_page < total_pages:
            current_page += 1

            # Rate limiting: More conservative approach - max 25 requests per minute
            if request_count >= 25:
                elapsed = time.time() - request_start_time
                if elapsed < 60:
                    sleep_time = 60 - elapsed + 5  # Add 5 second buffer
                    print(f"Rate limit approaching. Pausing for {sleep_time:.2f} seconds...")
                    time.sleep(sleep_time)
                request_count = 0
                request_start_time = time.time()

            # Add exponential backoff for 429 errors
            max_retries = 5
            retry_count = 0
            retry_delay = 2  # Start with 2 seconds

            while retry_count < max_retries:
                try:
                    # Directly pass the page parameter without modifying token_data
                    page_data = get_product_history_bundles(token_data, page=current_page)
                    request_count += 1

                    if page_data:
                        all_bundles.extend(page_data)
                        print(f"Retrieved page {current_page} of {total_pages}")
                    break  # Success, exit retry loop
                except Exception as e:
                    if "429" in str(e):  # Rate limit error
                        retry_count += 1
                        if retry_count < max_retries:
                            print(f"Rate limit exceeded. Retrying in {retry_delay} seconds... (Attempt {retry_count}/{max_retries})")
                            time.sleep(retry_delay)
                            retry_delay *= 2  # Exponential backoff
                            request_count = 0
                            request_start_time = time.time()
                        else:
                            print(f"Max retries reached. Skipping page {current_page}.")
                            break
                    else:
                        raise

            time.sleep(0.5)  # Small delay between requests

    # Step 5: Process the results - process all collected data
    if all_bundles:
        print(f"Successfully retrieved data across {len(all_bundles)} total bundles")

        # Optional: Save to a file
        with open("ercot_product_bundles.json", "w") as f:
            json.dump(all_bundles, f, indent=2)
        print("Saved results to ercot_product_bundles.json")

        # Optional: Convert to DataFrame for analysis
        import pandas as pd
        flattened_data = []
        for bundle in all_bundles:
            if "archives" in bundle:
                for archive in bundle["archives"]:
                    record = {
                        "product_name": bundle.get("product", {}).get("name", ""),
                        "emilId": bundle.get("product", {}).get("emilId", ""),
                        "docId": archive.get("docId", ""),
                        "friendlyName": archive.get("friendlyName", ""),
                        "postDatetime": archive.get("postDatetime", ""),
                        "download_url": archive.get("_links", {}).get("endpoint", {}).get("href", "")
                    }
                    flattened_data.append(record)

        df = pd.DataFrame(flattened_data if flattened_data else all_bundles)
        print(f"Created DataFrame with {len(df)} rows")
        print(df.head())

except Exception as e:
    print(f"Error: {e}")


Save LMP docIds as CSV

In [None]:
# Save the DataFrame to a CSV file
if 'df' in locals() and not df.empty:
    csv_filename = "ercot_product_bundles.csv"
    df.to_csv(csv_filename, index=False)
    print(f"Saved DataFrame to {csv_filename}")
else:
    print("No DataFrame available to save")


Check Number of Unique docIds

In [None]:
# Check how many unique docIds are in the DataFrame
if 'df' in locals() and not df.empty:
    unique_doc_ids = df['docId'].nunique()
    print(f"Number of unique docIds in the DataFrame: {unique_doc_ids}")
    
    # Optional: Display a few unique docIds as examples
    print("\nSample of unique docIds:")
    print(df['docId'].unique()[:5])  # Show first 5 unique docIds
else:
    print("No DataFrame available to analyze unique docIds")


Download LMP Data as ZIP in Batches

In [None]:
def bulk_download_archives(token_data, doc_ids_batch, emil_id="np6-787-cd"):
    """
    Download product archives in bulk using the ERCOT API.
    
    This function sends a POST request to the bulk download endpoint with a JSON payload
    containing up to 1000 docIds.
    """
    url = f"https://api.ercot.com/api/public-reports/archive/{emil_id}/download"
    headers = {
        "Authorization": f"Bearer {token_data['access_token']}",
        "Ocp-Apim-Subscription-Key": token_data['subscription_key'],
        "Content-Type": "application/json"
    }
    payload = {"docIds": doc_ids_batch}
    response = requests.post(url, headers=headers, json=payload, timeout=60)
    
    if response.status_code == 200:
        return response.content
    else:
        raise Exception(f"Bulk download failed: {response.status_code} - {response.text}")


# Main processing code
try:
    # Retrieve your token data before proceeding
    token_data = get_ercot_token()
    print("Authentication successful!")
    print(f"Access token: {token_data['access_token'][:20]}...")

    # Read CSV file containing docIds (assumes a column named "docId")
    csv_file = "ercot_product_bundles.csv"
    if not os.path.exists(csv_file):
        raise Exception(f"CSV file {csv_file} does not exist.")
    
    df = pd.read_csv(csv_file)
    if "docId" not in df.columns:
        raise Exception("CSV file must contain a 'docId' column.")
    
    # Extract the list of docIds
    doc_ids = df["docId"].dropna().tolist()
    total_doc_ids = len(doc_ids)
    print(f"Total docIds to process: {total_doc_ids}")

    # Define batch and rate limit parameters
    batch_size = 1000
    max_requests_per_minute = 25  # adjust as needed
    request_count = 0
    request_start_time = time.time()

    # If resuming from a specific batch (e.g. after 40 batches are already processed), set starting_index accordingly
    starting_index = 91000  # Change to 92 * batch_size if you want to resume from batch 92

    # Ensure the download directory exists
    download_dir = "bulk_downloads"
    os.makedirs(download_dir, exist_ok=True)

    # Process docIds in batches
    for batch_index in range(starting_index, total_doc_ids, batch_size):
        batch_doc_ids = doc_ids[batch_index: batch_index + batch_size]
        batch_number = (batch_index // batch_size) + 1
        print(f"\nProcessing batch {batch_number} with {len(batch_doc_ids)} docIds...")

        # Rate limiting: Pause if we've reached the max request count
        if request_count >= max_requests_per_minute:
            elapsed = time.time() - request_start_time
            if elapsed < 60:
                sleep_time = 60 - elapsed + 5  # 5-second buffer
                print(f"Rate limit reached. Sleeping for {sleep_time:.2f} seconds...")
                time.sleep(sleep_time)
            request_count = 0
            request_start_time = time.time()

        # Exponential backoff for 429 errors and token expiration handling
        max_retries = 5
        retry_count = 0
        retry_delay = 2

        while retry_count < max_retries:
            try:
                file_content = bulk_download_archives(token_data, batch_doc_ids)
                request_count += 1
                file_path = os.path.join(download_dir, f"bulk_download_batch_{batch_number}.zip")
                with open(file_path, "wb") as f:
                    f.write(file_content)
                print(f"Successfully downloaded batch {batch_number} to {file_path}")
                break  # Success: exit retry loop

            except Exception as e:
                error_message = str(e)
                # Check for rate limit error (429)
                if "429" in error_message:
                    retry_count += 1
                    if retry_count < max_retries:
                        print(f"Rate limit exceeded on batch {batch_number}. Retrying in {retry_delay} seconds... (Attempt {retry_count}/{max_retries})")
                        time.sleep(retry_delay)
                        retry_delay *= 2  # Exponential backoff
                    else:
                        print(f"Max retries reached for batch {batch_number}. Skipping this batch.")
                        break

                # Check for token expiry (commonly a 401 error or an 'expired' message)
                elif "401" in error_message or "expired" in error_message.lower():
                    print(f"Access token appears to be expired during batch {batch_number}. Refreshing token...")
                    token_data = get_ercot_token()
                    time.sleep(2)  # Wait briefly before retrying the same batch
                    # Do not increment retry_count here so we can reattempt with a fresh token
                    continue

                else:
                    print(f"Error downloading batch {batch_number}: {error_message}")
                    break

        time.sleep(0.5)  # Small delay between batches

except Exception as e:
    print(f"Error: {e}")


Select Certain Batches for Re-Try

*Used In the Event a Batch Fails to Download and Specific Missing docIDs are the Cause*

In [None]:
def bulk_download_archives(token_data, doc_ids_batch, emil_id="np6-787-cd"):
    """
    Download product archives in bulk using the ERCOT API.
    
    This function sends a POST request to the bulk download endpoint with a JSON payload
    containing up to 1000 docIds.
    """
    url = f"https://api.ercot.com/api/public-reports/archive/{emil_id}/download"
    headers = {
        "Authorization": f"Bearer {token_data['access_token']}",
        "Ocp-Apim-Subscription-Key": token_data['subscription_key'],
        "Content-Type": "application/json"
    }
    payload = {"docIds": doc_ids_batch}
    response = requests.post(url, headers=headers, json=payload, timeout=60)
    
    if response.status_code == 200:
        return response.content
    else:
        raise Exception(f"Bulk download failed: {response.status_code} - {response.text}")


# Main processing code
try:
    # Specify the batch number to download
    batch_to_download = 92  # Change this to the specific batch number you want to download
    batch_size = 1000
    
    # Retrieve your token data before proceeding
    token_data = get_ercot_token()
    print("Authentication successful!")
    print(f"Access token: {token_data['access_token'][:20]}...")

    # Read CSV file containing docIds (assumes a column named "docId")
    csv_file = "ercot_product_bundles.csv"
    if not os.path.exists(csv_file):
        raise Exception(f"CSV file {csv_file} does not exist.")
    
    df = pd.read_csv(csv_file)
    if "docId" not in df.columns:
        raise Exception("CSV file must contain a 'docId' column.")
    
    # Extract the list of docIds and exclude specific docIds
    excluded_doc_ids = [] # List Excluded docIds Here
    doc_ids = df["docId"].dropna().tolist()
    doc_ids = [doc_id for doc_id in doc_ids if str(doc_id) not in excluded_doc_ids]
    total_doc_ids = len(doc_ids)
    print(f"Total docIds available (after exclusions): {total_doc_ids}")

    # Calculate the starting index for the specified batch
    starting_index = (batch_to_download - 1) * batch_size
    
    # Ensure the batch number is valid
    if starting_index >= total_doc_ids:
        raise Exception(f"Batch {batch_to_download} exceeds the available data (max batch: {(total_doc_ids // batch_size) + 1})")
    
    # Get the docIds for the specified batch
    batch_doc_ids = doc_ids[starting_index: starting_index + batch_size]
    print(f"\nProcessing batch {batch_to_download} with {len(batch_doc_ids)} docIds...")

    # Ensure the download directory exists
    download_dir = "bulk_downloads"
    os.makedirs(download_dir, exist_ok=True)

    # Exponential backoff for 429 errors and token expiration handling
    max_retries = 5
    retry_count = 0
    retry_delay = 2

    while retry_count < max_retries:
        try:
            file_content = bulk_download_archives(token_data, batch_doc_ids)
            file_path = os.path.join(download_dir, f"bulk_download_batch_{batch_to_download}.zip")
            with open(file_path, "wb") as f:
                f.write(file_content)
            print(f"Successfully downloaded batch {batch_to_download} to {file_path}")
            break  # Success: exit retry loop

        except Exception as e:
            error_message = str(e)
            # Check for rate limit error (429)
            if "429" in error_message:
                retry_count += 1
                if retry_count < max_retries:
                    print(f"Rate limit exceeded on batch {batch_to_download}. Retrying in {retry_delay} seconds... (Attempt {retry_count}/{max_retries})")
                    time.sleep(retry_delay)
                    retry_delay *= 2  # Exponential backoff
                else:
                    print(f"Max retries reached for batch {batch_to_download}. Download failed.")
                    break

            # Check for token expiry (commonly a 401 error or an 'expired' message)
            elif "401" in error_message or "expired" in error_message.lower():
                print(f"Access token appears to be expired during batch {batch_to_download}. Refreshing token...")
                token_data = get_ercot_token()
                time.sleep(2)  # Wait briefly before retrying the same batch
                # Do not increment retry_count here so we can reattempt with a fresh token
                continue

            else:
                print(f"Error downloading batch {batch_to_download}: {error_message}")
                break

except Exception as e:
    print(f"Error: {e}")


Extract ZIP Files

In [None]:
def recursive_unzip(folder):
    # Create a 'Processed' folder inside the given folder if it doesn't exist
    processed_folder = os.path.join(folder, 'Processed')
    os.makedirs(processed_folder, exist_ok=True)
    
    while True:
        found_zip = False
        # Walk through the directory tree
        for root, dirs, files in os.walk(folder):
            # Skip walking the 'Processed' directory
            if 'Processed' in dirs:
                dirs.remove('Processed')
            for file in files:
                if file.lower().endswith('.zip'):
                    # Skip any zip files that are already in the Processed folder
                    if os.path.abspath(root).startswith(os.path.abspath(processed_folder)):
                        continue
                    found_zip = True
                    zip_path = os.path.join(root, file)
                    try:
                        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                            zip_ref.extractall(root)
                        # Move the processed zip file to the 'Processed' folder
                        destination = os.path.join(processed_folder, file)
                        shutil.move(zip_path, destination)
                        print(f"Extracted and moved: {zip_path} -> {destination}")
                    except zipfile.BadZipFile:
                        print(f"Warning: '{zip_path}' is not a valid zip file.")
        # If no zip files were found in this pass, exit the loop
        if not found_zip:
            break

if __name__ == "__main__":
    folder_path = "bulk_downloads"  # Replace with the path to your folder
    recursive_unzip(folder_path)


Combine CSVs into a Single Dataframe and Save the File

In [None]:
allowed_buses = ['AMOCOOIL_CC1'] # Enter in ERCOT ElectricalBuses Here

df = dd.read_csv("bulk_downloads/*.csv")
df_filtered = df[df["ElectricalBus"].isin(allowed_buses)]
# This filter is lazy; Dask won’t actually load data until you compute or write.
df_filtered.to_parquet("filtered.parquet")

Work with Filtered Dataset and Provide 8760(8784) LMPs for Target Buses

In [None]:
# Load partitioned Parquet dataset
df_dask = dd.read_parquet("filtered.parquet")

# Convert SCEDTimestamp to datetime format
df_dask["SCEDTimestamp"] = dd.to_datetime(df_dask["SCEDTimestamp"])

# Round timestamps to the nearest hour
df_dask["Hour"] = df_dask["SCEDTimestamp"].dt.floor("h")  # Truncate to start of hour

# Group by ElectricalBus and Hour, then calculate average LMP
df_grouped = df_dask.groupby(["ElectricalBus", "Hour"]).agg({"LMP": "mean"}).reset_index()

# Compute the final result (convert from Dask to Pandas)
df_result = df_grouped.compute()

# Save to Parquet
df_result.to_parquet("hourly_LMP.parquet", index=False)

# Show first few rows
print(df_result.head())
