In [0]:
# DESCRIPTION: Extracts data from Ergast F1 API and lands it in Bronze Layer (JSON)
# ---------------------------------------------------------

import requests
import json
from datetime import datetime
from pyspark.sql.functions import lit

# 1. Configuration
# ---------------------------------------------------------
base_url = "http://ergast.com/api/f1"
storage_account_name = "STORAGE_ACCOUNT" 
container_name = "bronze"
target_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net"

# 2. Helper Function: Fetch All Pages
# ---------------------------------------------------------
def fetch_ergast_data(endpoint):
    """
    Fetches all records for a specific Ergast API endpoint by handling pagination.
    """
    all_results = []
    limit = 100  
    offset = 0
    
    print(f"🚀 Starting ingestion for: {endpoint}")
    
    while True:
        url = f"{base_url}/{endpoint}.json?limit={limit}&offset={offset}"
        response = requests.get(url)
        
        if response.status_code != 200:
            raise Exception(f"API Error: {response.status_code}")
            
        data = response.json()
        mr_data = data['MRData']
        
        # Identify the list key (e.g., 'DriverTable' -> 'Drivers')
        if 'DriverTable' in mr_data:
            table = mr_data['DriverTable']['Drivers']
        elif 'ConstructorTable' in mr_data:
            table = mr_data['ConstructorTable']['Constructors']
        elif 'CircuitTable' in mr_data:
            table = mr_data['CircuitTable']['Circuits']
        elif 'RaceTable' in mr_data:
            table = mr_data['RaceTable']['Races']
        else:
            print(f"⚠️ specific table key not found for {endpoint}")
            break

        if not table:
            break 
            
        all_results.extend(table)
        
        # Check if we need to fetch more
        total = int(mr_data['total'])
        offset += limit
        print(f"   Fetched {len(all_results)} / {total} records...")
        
        if offset >= total:
            break
            
    print(f"✅ Finished. Total records: {len(all_results)}")
    return all_results

# 3. Execution: Ingest Key Tables
# ---------------------------------------------------------
endpoints = ["drivers", "constructors", "circuits", "races"]

for entity in endpoints:
    raw_data = fetch_ergast_data(entity)
    
    # B. Convert to Spark DataFrame
    rdd = spark.sparkContext.parallelize([json.dumps(r) for r in raw_data])
    df = spark.read.json(rdd)
    
    # C. Add Audit Columns
    df_with_audit = df.withColumn("ingestion_date", lit(datetime.now())) \
                      .withColumn("source_system", lit("Ergast API"))
    
    # D. Write to Bronze (Landing Zone)
    save_path = f"{target_path}/{entity}"
    df_with_audit.write.mode("overwrite").format("json").save(save_path)
    print(f"💾 Saved {entity} to {save_path}\n")

print("🎉 API Ingestion Pipeline Completed!")