## Extract Indicators from API
This notebook calls an API to extract indicators data and stores the JSON response in a Unity Catalog volume.

In [0]:
%run ../.config

In [0]:
import requests
import json
from datetime import datetime
import os
from datetime import datetime, timedelta
from urllib.parse import quote
import time

In [0]:
# Define notebook parameters with defaults for local testing
dbutils.widgets.text("secret_scope", "project-77-dev", "Secret Scope Name")
dbutils.widgets.text("date_from", "", "Start Date")
dbutils.widgets.text("date_to", "", "End Date")
dbutils.widgets.text("catalog", "", "Target Catalog")
dbutils.widgets.text("schema", "", "Target Schema")

# Retrieve parameter values
secret_scope = dbutils.widgets.get("secret_scope")
start_date = dbutils.widgets.get("date_from")
end_date = dbutils.widgets.get("date_to")
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
indicators_volume = "indicators_volume"
indicator_metrics_volume = "indicator_metrics"

In [0]:
# Handle date_from and date_to logic
if (not catalog):
    raise ValueError("Catalog must be provided.")

if (not schema):
    raise ValueError("Schema must be provided.")

if (start_date and not end_date) or (not start_date and end_date):
    raise ValueError("Both 'date_from' and 'date_to' must be provided together, or neither.")

if not start_date and not end_date:
    today = datetime.utcnow().date()
    end_date = today.strftime("%Y-%m-%d")
    start_date = (today - timedelta(days=2)).strftime("%Y-%m-%d")

def to_iso8601(date_str):
    return f"{date_str}T00:00:00Z"

start_date = to_iso8601(start_date)
end_date = to_iso8601(end_date)

In [0]:
# Verify configuration variables are loaded
print(f"Catalog: {catalog}")
print(f"Schema: {schema}")
print(f"Indicators Volume: {indicators_volume}")
print(f"Indicator Metrics Volume: {indicator_metrics_volume}")

print(f"Start Date: {start_date}")
print(f"End Date: {end_date}")

In [0]:
# Configure API endpoint and output file
api_url = "https://api.esios.ree.es/indicators"
output_filename = "indicators_data.json"

# Retrieve API token from secrets
# Replace 'your-scope' and 'esios-token' with your actual secret scope and key
api_token = dbutils.secrets.get(scope=secret_scope, key="esios-token")

print(f"API URL: {api_url}")
print(f"Output filename: {output_filename}")
print("✓ API token retrieved from secrets")

In [0]:

# Prepare headers with authentication token
headers = {
    "x-api-key": api_token,
    "Accept": "application/json",
    "Content-Type": "application/json"
}

# Make API call
print(f"Calling API: {api_url}")
print(f"Timestamp: {datetime.now().isoformat()}")

try:
    response = requests.get(api_url, headers=headers)
    response.raise_for_status()  # Raise exception for bad status codes
    
    # Parse JSON response
    json_data = response.json()
    print(f"✓ API call successful")
    print(f"Response status code: {response.status_code}")
    
    # Display response structure
    if isinstance(json_data, dict):
        print(f"Response keys: {list(json_data.keys())}")
        if 'indicators' in json_data:
            print(f"Number of indicators: {len(json_data['indicators'])}")
    print(f"Response preview: {str(json_data)[:200]}...")
    
except requests.exceptions.RequestException as e:
    raise Exception(f"API call failed: {str(e)}")
except json.JSONDecodeError as e:
    raise Exception(f"Failed to parse JSON response: {str(e)}")

In [0]:
# Construct the full volume path
volume_path = f"/Volumes/{catalog}/{schema}/{indicators_volume}/{output_filename}"
print(f"Target path: {volume_path}")

In [0]:
# Add date to output filename
date_str = datetime.now().strftime("%Y-%m-%d")
dated_output_filename = f"{date_str}_indicators.json"
volume_path = f"/Volumes/{catalog}/{schema}/{indicators_volume}/{dated_output_filename}"

# Write JSON data to the volume
try:
    with open(volume_path, 'w') as f:
        json.dump(json_data, f, indent=2)
    
    print(f"✓ Successfully wrote JSON to {volume_path}")
    
    # Verify file was written
    if os.path.exists(volume_path):
        file_size = os.path.getsize(volume_path)
        print(f"File size: {file_size:,} bytes")
        print(f"File location: {volume_path}")
    else:
        raise Exception("File was not created successfully")
        
except Exception as e:
    raise Exception(f"Failed to write to volume: {str(e)}")

In [0]:
# Read the JSON file from the volume
try:
    with open(volume_path, 'r') as f:
        indicators_data = json.load(f)
    
    # Extract indicators array
    indicators = indicators_data.get('indicators', [])
    
    print(f"✓ Successfully read JSON from {volume_path}")
    print(f"Number of indicators found: {len(indicators)}")
    
    # Display sample of first few indicators
    if indicators:
        print("\nSample indicators:")
        for i, indicator in enumerate(indicators[:3]):
            print(f"  - ID: {indicator.get('id')}, Name: {indicator.get('short_name')}")
        if len(indicators) > 3:
            print(f"  ... and {len(indicators) - 3} more")
    
except Exception as e:
    raise Exception(f"Failed to read indicators JSON: {str(e)}")

In [0]:
# URL encode the dates for API call
start_date_encoded = quote(start_date)
end_date_encoded = quote(end_date)

# Hardcoded API parameters
geo_agg = "sum"
time_trunc = "hour"

print(f"Date range configured:")
print(f"  Start: {start_date}")
print(f"  End: {end_date}")
print(f"  Geo aggregation: {geo_agg}")
print(f"  Time truncation: {time_trunc}")

In [0]:
# Track results
successful_extractions = []
failed_extractions = []

print(f"Starting extraction for {len(indicators)} indicators...\n")

for idx, indicator in enumerate(indicators, 1):
    indicator_id = indicator.get('id')
    indicator_name = indicator.get('short_name', 'Unknown')
    
    print(f"[{idx}/{len(indicators)}] Processing indicator ID {indicator_id}: {indicator_name}")
    
    try:
        # Construct API URL with parameters
        metrics_api_url = f"https://api.esios.ree.es/indicators/{indicator_id}"
        params = {
            'start_date': start_date,
            'end_date': end_date,
            'geo_agg': geo_agg,
            'time_trunc': time_trunc
        }
        
        # Make API call
        response = requests.get(metrics_api_url, headers=headers, params=params)
        response.raise_for_status()
        
        # Parse JSON response
        metrics_data = response.json()
        
        # Construct output filename with date and indicator ID
        date_str = datetime.now().strftime("%Y-%m-%d")
        metrics_filename = f"{date_str}_indicator_metrics_{indicator_id}.json"
        metrics_path = f"/Volumes/{catalog}/{schema}/{indicator_metrics_volume}/{metrics_filename}"
        
        # Write JSON data to volume
        with open(metrics_path, 'w') as f:
            json.dump(metrics_data, f, indent=2)
        
        print(f"  ✓ Successfully extracted and saved to {metrics_filename}")
        successful_extractions.append({'id': indicator_id, 'name': indicator_name, 'file': metrics_filename})
        
        # Small delay to avoid overwhelming the API
        time.sleep(0.1)
        
    except requests.exceptions.RequestException as e:
        print(f"  ✗ API call failed: {str(e)}")
        failed_extractions.append({'id': indicator_id, 'name': indicator_name, 'error': str(e)})
    except Exception as e:
        print(f"  ✗ Error processing indicator: {str(e)}")
        failed_extractions.append({'id': indicator_id, 'name': indicator_name, 'error': str(e)})

print(f"\n{'='*60}")
print(f"Extraction complete!")
print(f"Successful: {len(successful_extractions)}")
print(f"Failed: {len(failed_extractions)}")
print(f"{'='*60}")

In [0]:
# Display detailed summary
if successful_extractions:
    print(f"\n✓ Successfully extracted {len(successful_extractions)} indicators:")
    for item in successful_extractions[:10]:  # Show first 10
        print(f"  - ID {item['id']}: {item['name']} → {item['file']}")
    if len(successful_extractions) > 10:
        print(f"  ... and {len(successful_extractions) - 10} more")

if failed_extractions:
    print(f"\n✗ Failed to extract {len(failed_extractions)} indicators:")
    for item in failed_extractions[:5]:  # Show first 5 failures
        print(f"  - ID {item['id']}: {item['name']} - {item['error'][:100]}")
    if len(failed_extractions) > 5:
        print(f"  ... and {len(failed_extractions) - 5} more failures")

print(f"\nAll metrics stored in: /Volumes/{catalog}/{schema}/{indicator_metrics_volume}/")