# Load World Bank Indicator Metadata

This notebook fetches all World Bank indicator metadata and writes each record to a Delta table incrementally. Supports resuming from where it left off.

**Parameters (from DAB variables):**
- `catalog`: Unity Catalog catalog name
- `schema`: Schema name within the catalog

In [None]:
%pip install wbgapi tqdm requests --quiet

In [None]:
# Configuration from DAB variables (set via job parameters or widgets)
# These are passed from databricks.yml when run as part of the workflow

dbutils.widgets.text("catalog", "quant_risk_dev", "Catalog Name")
dbutils.widgets.text("schema", "indicators", "Schema Name")
dbutils.widgets.dropdown("fresh_start", "false", ["true", "false"], "Fresh Start")

CATALOG = dbutils.widgets.get("catalog")
SCHEMA = dbutils.widgets.get("schema")
TABLE_NAME = "worldbank_indicators"
FULL_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{TABLE_NAME}"
FRESH_START = dbutils.widgets.get("fresh_start").lower() == "true"

print(f"Configuration:")
print(f"  Catalog: {CATALOG}")
print(f"  Schema: {SCHEMA}")
print(f"  Table: {FULL_TABLE_NAME}")
print(f"  Fresh Start: {FRESH_START}")

In [None]:
import wbgapi as wb
import requests
from requests.exceptions import RequestException, Timeout
from tqdm import tqdm
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.utils import AnalysisException

In [None]:
# Define schema
schema = StructType([
    StructField("indicator_id", StringType(), False),
    StructField("indicator_name", StringType(), True),
    StructField("long_definition", StringType(), True),
    StructField("source_organization", StringType(), True),
    StructField("source", StringType(), True),
    StructField("topics", StringType(), True),
    StructField("unit", StringType(), True),
    StructField("periodicity", StringType(), True),
    StructField("aggregation_method", StringType(), True),
    StructField("license_type", StringType(), True),
    StructField("embedding_text", StringType(), True),
])

In [None]:
def fetch_indicator_metadata(indicator_id: str) -> dict:
    """Fetch full metadata from World Bank API directly.
    
    Args:
        indicator_id: World Bank indicator ID (e.g., 'SP.POP.TOTL')
        
    Returns:
        Metadata dictionary from World Bank API
        
    Raises:
        RequestException: If API request fails
        ValueError: If response format is unexpected
    """
    url = f"https://api.worldbank.org/v2/indicator/{indicator_id}?format=json"
    
    resp = requests.get(url, timeout=30)
    resp.raise_for_status()
    data = resp.json()
    
    if len(data) < 2 or not data[1]:
        raise ValueError(f"Unexpected API response format for {indicator_id}")
    
    return data[1][0]

In [None]:
# Test the fetch function
test_meta = fetch_indicator_metadata("SP.POP.TOTL")
print(f"Keys: {test_meta.keys()}")
print(f"sourceNote: {test_meta.get('sourceNote', '')[:200]}...")

In [None]:
# Check if table exists and get already loaded IDs
existing_ids = set()

if FRESH_START:
    spark.sql(f"DROP TABLE IF EXISTS {FULL_TABLE_NAME}")
    spark.sql(f"DROP VIEW IF EXISTS {FULL_TABLE_NAME}")
    print("Fresh start - dropped existing table")
else:
    try:
        existing_df = spark.sql(f"SELECT indicator_id FROM {FULL_TABLE_NAME}")
        existing_ids = set(row.indicator_id for row in existing_df.collect())
        print(f"Resuming - found {len(existing_ids)} existing records")
    except AnalysisException as e:
        # Table doesn't exist - this is expected on first run
        if "TABLE_OR_VIEW_NOT_FOUND" in str(e) or "does not exist" in str(e).lower():
            print("Table doesn't exist yet, starting fresh")
        else:
            # Re-raise unexpected errors
            raise

# Create table if it doesn't exist
if not existing_ids or FRESH_START:
    # Ensure schema exists
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
    
    empty_df = spark.createDataFrame([], schema)
    empty_df.write \
        .format("delta") \
        .option("delta.enableChangeDataFeed", "true") \
        .mode("overwrite") \
        .saveAsTable(FULL_TABLE_NAME)
    
    # Set table description
    spark.sql(f"""
        COMMENT ON TABLE {FULL_TABLE_NAME} IS 
        'World Bank indicator metadata including economic, social, and environmental development indicators with embeddings for vector search.'
    """)
    print(f"Created table {FULL_TABLE_NAME} with CDF enabled")

In [None]:
# Get list of all series IDs using wbgapi
print("Fetching series list...")
series_list = wb.series.info()
all_series_ids = [s.get("id") for s in series_list.items]

# Filter out already loaded IDs
series_ids = [sid for sid in all_series_ids if sid not in existing_ids]
print(f"Total indicators: {len(all_series_ids)}")
print(f"Already loaded: {len(existing_ids)}")
print(f"Remaining to fetch: {len(series_ids)}")

In [None]:
# Fetch and append each record
failed_ids = []

for series_id in tqdm(series_ids, desc="Fetching metadata"):
    try:
        meta = fetch_indicator_metadata(series_id)

        indicator_id = meta.get("id", series_id)
        indicator_name = meta.get("name", "") or ""
        long_definition = meta.get("sourceNote", "") or ""

        topics_list = meta.get("topics", []) or []
        topics = ", ".join([t.get("value", "") for t in topics_list if isinstance(t, dict)])

        source_obj = meta.get("source")
        source = source_obj.get("value", "") if isinstance(source_obj, dict) else ""

        embedding_text = f"{indicator_name}. {long_definition}".strip()
        if embedding_text == ".":
            embedding_text = indicator_name or indicator_id

        record = [(
            indicator_id,
            indicator_name,
            long_definition,
            meta.get("sourceOrganization", "") or "",
            source,
            topics,
            meta.get("unit", "") or "",
            "",  # periodicity not in this API
            "",  # aggregation_method not in this API
            "",  # license_type not in this API
            embedding_text,
        )]

        row_df = spark.createDataFrame(record, schema)
        row_df.write.format("delta").mode("append").saveAsTable(FULL_TABLE_NAME)

    except (RequestException, Timeout) as e:
        # Network errors - log and track for retry
        print(f"Network error fetching {series_id}: {e}")
        failed_ids.append((series_id, str(e)))
    except ValueError as e:
        # Invalid response format - log and skip
        print(f"Invalid response for {series_id}: {e}")
        failed_ids.append((series_id, str(e)))

print(f"\nDone! Failed: {len(failed_ids)} indicators")
if failed_ids:
    print("Failed IDs (first 10):")
    for sid, err in failed_ids[:10]:
        print(f"  - {sid}: {err}")

In [None]:
# Verify
count = spark.sql(f"SELECT COUNT(*) FROM {FULL_TABLE_NAME}").collect()[0][0]
print(f"Total records: {count}")
display(spark.sql(f"SELECT * FROM {FULL_TABLE_NAME} LIMIT 5"))