In [0]:
# Databricks notebook source
import requests
import datetime as dt
from typing import Optional, Dict, List
from pyspark.sql import functions as F
import time
import pytz 
import json



# 1. Define widgets to accept parameters from the job run.

dbutils.widgets.dropdown("environment", "dev", ["dev", "test", "prod"], "Environment")
dbutils.widgets.text("dataset_id", "", "Fingrid dataset id")
dbutils.widgets.text("dataset_name", "", "Fingrid dataset name")
dbutils.widgets.text("page_size", "", "load page size")
dbutils.widgets.text("start_timestamp", "", "load start_timestamp")
dbutils.widgets.text("incremental_load_days", "", "incremental_load_days")

helsinki_tz = pytz.timezone('Europe/Helsinki')

# 2. Read the values from the widgets.
environment = dbutils.widgets.get("environment")
CATALOG = f"w_{environment}"
LANDING_ROOT = f"s3a://wartsila-datalake-{environment}-landing/fingrid/"
dataset_id = int(float(dbutils.widgets.get("dataset_id")))
page_size = int(float(dbutils.widgets.get("page_size")))
start_timestamp = dbutils.widgets.get("start_timestamp")
incremental_load_days = int(float(dbutils.widgets.get("incremental_load_days")))
dataset_name = dbutils.widgets.get("dataset_name")

final_end_time = dt.datetime.now(helsinki_tz)
naive_start_time = dt.datetime.fromisoformat(start_timestamp)
current_start_time = helsinki_tz.localize(naive_start_time)

print(f"Starting historical load for dataset {dataset_id}_{dataset_name} from {current_start_time.isoformat()} until {final_end_time.isoformat()} with page size {page_size}.")
print(f"Batch window size: {incremental_load_days} days.")

# ---- API KEY ----
FINGRID_API_KEY = dbutils.secrets.get("fingrid", "api-key")


BASE_URL = "https://data.fingrid.fi/api/data?"
# URL Format 
# GET https://data.fingrid.fi/api/data?datasets=245&startTime=2025-06-28T12:15:00Z&endTime=2025-08-28T12:15:00Z&page=1&pageSize=100

# 4. Main Loop - Replicates the ADF "Until" activity with rate limit handling

while current_start_time < final_end_time:
    # Calculate the end of the current batch window
    current_end_time = current_start_time + dt.timedelta(days=incremental_load_days)

    # Ensure the last batch doesn't go into the future
    if current_end_time > final_end_time:
        current_end_time = final_end_time
    
    # Format timestamps for the API URL
    start_str = current_start_time.strftime('%Y-%m-%dT%H:%M:%SZ')
    end_str = current_end_time.strftime('%Y-%m-%dT%H:%M:%SZ')
    
    print(f"\n--- Processing batch from {start_str} to {end_str} ---")
    
    # --- FIX: Inner loop now includes retry logic for 429 errors ---
    page = 1
    all_data_for_batch = []
    while True:
        headers = {'x-api-key': FINGRID_API_KEY}
        params = {
            'datasets': dataset_id,
            'startTime': start_str,
            'endTime': end_str,
            'pageSize': page_size,
            'page': page
        }
        
        # Retry loop for the current page request
        max_retries = 5
        retry_delay = 2  
        
        for attempt in range(max_retries):
            try:
                response = requests.get(BASE_URL, headers=headers, params=params)
                
                # If we get a 429 error, wait and then continue to the next attempt
                if response.status_code == 429:
                    print(f"Rate limited on page {page} (Attempt {attempt + 1}/{max_retries}). Retrying in {retry_delay}s...")
                    time.sleep(retry_delay)
                    retry_delay *= 2  # Exponentially increase the delay
                    continue # Go to the next iteration of the for loop
                
                # If the status is not 429 and not 200, raise an error immediately
                response.raise_for_status()
                
                # If the request was successful (status 200), break the retry loop
                break
                
            except requests.exceptions.RequestException as e:
                print(f"A network-related error occurred on attempt {attempt + 1}: {e}")
                if attempt == max_retries - 1:
                    raise  # Re-raise the exception if all retries fail
                time.sleep(retry_delay)
                retry_delay *= 2

        # If the loop finished due to max retries on a 429 error
        else:
            raise Exception(f"Failed to fetch page {page} after {max_retries} retries due to persistent rate limiting.")

        # Process the successful response
        response_json = response.json()
        data_segment = response_json.get('data', [])
        
        if not data_segment:
            print(f"Page {page}: No more data in this batch. Exiting pagination loop.")
            break
        
        print(f"Page {page}: Fetched {len(data_segment)} records.")
        all_data_for_batch.extend(data_segment)
        
        pagination_info = response_json.get('pagination', {})
        next_page = pagination_info.get('nextPage')
        print(f"Page {page}: Next page is {next_page}.")
        
        if next_page is None:
            print("Last page reached for this time window.")
            break
        
        page = next_page
        # Add a small, consistent delay between successful page requests to be a good API citizen
        time.sleep(0.5)

    # After collecting all pages for the current time window, write the data
    if all_data_for_batch:
        json_data_string = json.dumps(all_data_for_batch, indent=4)

        start_str_for_file = current_start_time.strftime('%Y%m%dT%H%M%SZ')
        end_str_for_file = current_end_time.strftime('%Y%m%dT%H%M%SZ')

        file_name = f"batch_{start_str_for_file}_to_{end_str_for_file}.json"
        output_path = f"{LANDING_ROOT}{dataset_id}_{dataset_name}/{file_name}"

        dbutils.fs.put(output_path, json_data_string, overwrite=True)
        print(f"Successfully wrote {len(all_data_for_batch)} records to: {output_path}")

    # **CRUCIAL STEP**: Update the start time for the next iteration of the main time-window loop
    current_start_time = current_end_time

print("\nHistorical load complete.")

# Optional: Update your control table with the new high-watermark
spark.sql(f"UPDATE {CATALOG}.landing_admin.meta_control_table SET last_timestamp = '{final_end_time.isoformat()}' WHERE source_dataset_id = {dataset_id}")



