This script ingests all upcoming NBA player props odds from The Odds API into the bronze level table


In [None]:
# notebooks/01_bronze/ingest_odds.ipynb

import json
import time
import uuid
from datetime import datetime

import requests
from pyspark.sql.types import StringType, StructField, StructType, TimestampType

# --- 1. Configuration ---
# We use the secret we configured earlier
API_KEY = dbutils.secrets.get(scope="voitto_secrets", key="ODDS_API_KEY")  # type: ignore # noqa: F821

SPORT = "basketball_nba"
REGIONS = "eu"
MARKETS = "player_points,player_rebounds,player_assists"
BOOKERS = "pinnacle"

# Unique ID for this entire ingestion run
INGEST_ID = str(uuid.uuid4())
INGEST_TS = datetime.now()

print(f"Starting Ingestion Run: {INGEST_ID} at {INGEST_TS}")

# --- 2. Helper Functions ---


def get_request(url: str, params: dict) -> dict | None:
    """Standard wrapper for requests with basic error handling."""
    response = requests.get(url, params=params, timeout=10)

    remaining = response.headers.get("x-requests-remaining")
    if remaining and float(remaining) < 50:
        print(f"WARNING: API Quota low! Remaining: {remaining}")

    if response.status_code != 200:
        msg = (
            f"API request failed with status {response.status_code}: ",
            f"{response.text}",
        )
        raise requests.exceptions.HTTPError(msg)

    return response.json()


def save_to_bronze(data_list: list[dict]) -> None:
    """Writes a list of dictionaries to the Bronze Delta table."""
    if not data_list:
        print("No data to save.")
        return

    # Define Schema strictly to match the table we created
    schema = StructType(
        [
            StructField("ingest_id", StringType(), True),
            StructField("ingest_timestamp", TimestampType(), True),
            StructField("api_endpoint", StringType(), True),
            StructField("raw_json", StringType(), True),
        ]
    )

    df = spark.createDataFrame(data_list, schema) # type: ignore  # noqa: F821

    # Append to the Delta Table
    (
        df.write.format("delta")
        .mode("append")
        .saveAsTable("voitto_bronze.odds_api_raw")
    )

    print(f"Saved {len(data_list)} records to voitto_bronze.odds_api_raw")


# --- 3. Main Execution ---

records_buffer = []

# A. Fetch Schedule
print(f"1. Fetching Schedule for {SPORT}...")
schedule_endpoint = f"https://api.the-odds-api.com/v4/sports/{SPORT}/events"
schedule_data = get_request(schedule_endpoint, {"api_key": API_KEY})

if schedule_data:
    # 1. Save the Schedule itself as a raw record
    records_buffer.append(
        {
            "ingest_id": INGEST_ID,
            "ingest_timestamp": INGEST_TS,
            "api_endpoint": "schedule",
            "raw_json": json.dumps(schedule_data),
        }
    )

    # 2. Iterate through games to get Odds
    game_ids = [game["id"] for game in schedule_data]
    print(f"Found {len(game_ids)} games. Fetching odds...")

    for game_id in game_ids:
        # Rate limiting: TheOddsAPI allows ~1 req/sec
        time.sleep(0.5)

        odds_endpoint = f"https://api.the-odds-api.com/v4/sports/{SPORT}/events/{game_id}/odds"
        odds_data = get_request(
            odds_endpoint,
            {
                "api_key": API_KEY,
                "regions": REGIONS,
                "markets": MARKETS,
                "bookmakers": BOOKERS,
                "oddsFormat": "decimal",
            },
        )

        if odds_data:
            records_buffer.append(
                {
                    "ingest_id": INGEST_ID,
                    "ingest_timestamp": INGEST_TS,
                    "api_endpoint": f"odds/{game_id}",
                    "raw_json": json.dumps(odds_data),
                }
            )

            # Optimization: Write in batches of 10
            if len(records_buffer) >= 10:
                save_to_bronze(records_buffer)
                records_buffer = []

    # Save any remaining records
    if records_buffer:
        save_to_bronze(records_buffer)

print("Ingestion Complete.")

: 