# UK Census Data Extraction Pipeline
**Extracting demographic data from ONS API to BigQuery**

In [1]:
import os
import json
from typing import List
import requests
import pandas as pd
from google.cloud import bigquery

# ============================================
# CONFIG
# ============================================
API_ROOT = "https://api.beta.ons.gov.uk/v1/population-types"

# TODO: Replace with YOUR actual project ID from GCP Console
PROJECT_ID = "uk-census-portfolio"  # ‚Üê Change this to your actual project ID
DATASET_ID = "raw_data"
TABLE_ID = "ons_census_raw"

def fetch_observations(population_type: str, area_type: str, dimension: str) -> pd.DataFrame:
    """Fetch one dimension for one area_type, return flattened DataFrame or empty."""
    url = f"{API_ROOT}/{population_type}/census-observations"
    try:
        r = requests.get(
            url, 
            params={"area-type": area_type, "dimensions": dimension}, 
            timeout=30
        )
        r.raise_for_status()
        payload = r.json()
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 403:
            print(f"‚ö†Ô∏è  Skipping {area_type}-{dimension}: Dataset too large for API")
        else:
            print(f"‚ùå Request failed for {area_type}-{dimension}: {e}")
        return pd.DataFrame()
    except Exception as e:
        print(f"‚ùå Unexpected error for {area_type}-{dimension}: {e}")
        return pd.DataFrame()

    obs = payload.get("observations", [])
    if not obs:
        print(f"‚ÑπÔ∏è  No observations for {area_type}-{dimension}")
        return pd.DataFrame()

    records = []
    for o in obs:
        dims = o.get("dimensions", [])
        if len(dims) < 2:
            continue
        area_dim, var_dim = dims[0], dims[1]
        records.append({
            "area_type_name": area_dim.get("dimension"),
            "area_type_id": area_dim.get("dimension_id"),
            "area_name": area_dim.get("option"),
            "area_id": area_dim.get("option_id"),
            "variable_type_name": var_dim.get("dimension"),
            "variable_type_id": var_dim.get("dimension_id"),
            "variable_name": var_dim.get("option"),
            "variable_id": var_dim.get("option_id"),
            "observation": o.get("observation"),
        })
    return pd.DataFrame.from_records(records)

def collect_all(area_types: List[str], population_type: str, dimensions: List[str]) -> pd.DataFrame:
    """Loop area types and dimensions; concatenate non-empty results."""
    frames = []
    total = len(area_types) * len(dimensions)
    current = 0
    
    for a in area_types:
        for d in dimensions:
            current += 1
            print(f"[{current}/{total}] {population_type} | {a} | {d}")
            df = fetch_observations(population_type, a, d)
            if not df.empty:
                frames.append(df)
    
    return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()

def ensure_bigquery_dataset(project_id: str, dataset_id: str, location: str = "EU"):
    """Create BigQuery dataset if it doesn't exist."""
    client = bigquery.Client(project=project_id)
    dataset_ref = f"{project_id}.{dataset_id}"
    
    try:
        client.get_dataset(dataset_ref)
        print(f"‚úÖ Dataset {dataset_ref} already exists")
    except Exception:
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = location
        dataset.description = "Raw census data from ONS API (Portfolio Project)"
        client.create_dataset(dataset, timeout=30)
        print(f"‚úÖ Created dataset {dataset_ref}")

def load_to_bigquery(df: pd.DataFrame, project_id: str, dataset_id: str, table_id: str):
    """Load DataFrame to BigQuery with explicit schema."""
    client = bigquery.Client(project=project_id)
    table_ref = f"{project_id}.{dataset_id}.{table_id}"
    
    # Define schema explicitly (best practice)
    schema = [
        bigquery.SchemaField("area_type_name", "STRING"),
        bigquery.SchemaField("area_type_id", "STRING"),
        bigquery.SchemaField("area_name", "STRING"),
        bigquery.SchemaField("area_id", "STRING"),
        bigquery.SchemaField("variable_type_name", "STRING"),
        bigquery.SchemaField("variable_type_id", "STRING"),
        bigquery.SchemaField("variable_name", "STRING"),
        bigquery.SchemaField("variable_id", "STRING"),
        bigquery.SchemaField("observation", "INTEGER"),
    ]
    
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,  # Overwrite on each run
    )
    
    print(f"üì§ Loading {len(df):,} rows to {table_ref}...")
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()  # Wait for completion
    
    # Verify and show stats
    table = client.get_table(table_ref)
    print(f"‚úÖ Loaded {table.num_rows:,} rows to BigQuery")
    print(f"   üìä Table size: {table.num_bytes / (1024**2):.2f} MB")
    print(f"   üîó View at: https://console.cloud.google.com/bigquery?project={project_id}&p={project_id}&d={dataset_id}&t={table_id}&page=table")

if __name__ == "__main__":
    print("=" * 70)
    print("üè¥ UK CENSUS DATA PIPELINE")
    print("=" * 70)
    print(f"üìç Project: {PROJECT_ID}")
    print(f"üìÇ Target: {DATASET_ID}.{TABLE_ID}")
    print("=" * 70)
    
    # Load metadata from your ons_words.json
    try:
        with open("ons_words.json", "r") as f:
            meta = json.load(f)
    except FileNotFoundError:
        print("‚ùå Error: ons_words.json not found in current directory")
        print("   Make sure you're running this script from the same folder as ons_words.json")
        exit(1)

    # Configure extraction
    population_type = "UR"  # Usual Residents
    area_types = ["wpc"]    # Westminster Parliamentary Constituencies (start with this)
    dimensions = meta["DIMENSIONS_BY_POPULATION_TYPE"][population_type]
    
    print(f"üìã Will extract {len(dimensions)} dimensions for {len(area_types)} area type(s)")
    print(f"   Total API calls: {len(dimensions) * len(area_types)}")
    
    # Ask for confirmation before proceeding
    user_input = input("\n‚è≥ This will take ~1-2 minutes. Continue? (y/n): ")
    if user_input.lower() != 'y':
        print("‚ùå Cancelled by user")
        exit(0)

    # Step 1: Extract from API
    print("\n" + "=" * 70)
    print("üì• STEP 1: EXTRACTING FROM ONS API")
    print("=" * 70)
    combined_df = collect_all(area_types, population_type, dimensions)
    
    if combined_df.empty:
        print("‚ùå No data extracted. Exiting.")
        exit(1)
    
    print(f"\n‚úÖ Extraction complete: {len(combined_df):,} rows")

    # Step 2: Save locally (backup)
    print("\n" + "=" * 70)
    print("üíæ STEP 2: SAVING LOCAL BACKUP")
    print("=" * 70)
    os.makedirs("data", exist_ok=True)
    backup_path = "data/wpc_results.csv"
    combined_df.to_csv(backup_path, index=False)
    file_size_mb = os.path.getsize(backup_path) / (1024**2)
    print(f"‚úÖ Saved to: {backup_path}")
    print(f"   üìä File size: {file_size_mb:.2f} MB")

    # Step 3: Create BigQuery dataset if needed
    print("\n" + "=" * 70)
    print("üóÑÔ∏è  STEP 3: SETTING UP BIGQUERY")
    print("=" * 70)
    try:
        ensure_bigquery_dataset(PROJECT_ID, DATASET_ID, location="EU")
    except Exception as e:
        print(f"‚ùå Failed to create dataset: {e}")
        print("\nüí° Common fixes:")
        print("   1. Make sure PROJECT_ID is correct in the script")
        print("   2. Run: gcloud auth application-default login")
        print("   3. Enable BigQuery API: gcloud services enable bigquery.googleapis.com")
        exit(1)

    # Step 4: Load to BigQuery
    print("\n" + "=" * 70)
    print("‚òÅÔ∏è  STEP 4: LOADING TO BIGQUERY")
    print("=" * 70)
    try:
        load_to_bigquery(combined_df, PROJECT_ID, DATASET_ID, TABLE_ID)
    except Exception as e:
        print(f"‚ùå Failed to load data: {e}")
        exit(1)
    
    # Success summary
    print("\n" + "=" * 70)
    print("‚úÖ PIPELINE COMPLETE!")
    print("=" * 70)
    print(f"\nüìä Data Summary:")
    print(f"   ‚Ä¢ Rows loaded: {len(combined_df):,}")
    print(f"   ‚Ä¢ Unique areas: {combined_df['area_name'].nunique():,}")
    print(f"   ‚Ä¢ Variables: {combined_df['variable_type_id'].nunique()}")
    
    print(f"\nüîç Next Steps:")
    print(f"   1. View data in BigQuery console:")
    print(f"      https://console.cloud.google.com/bigquery?project={PROJECT_ID}")
    print(f"\n   2. Test query:")
    print(f"      SELECT area_name, variable_type_id, COUNT(*) as row_count")
    print(f"      FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`")
    print(f"      GROUP BY 1, 2")
    print(f"      ORDER BY 3 DESC")
    print(f"      LIMIT 10;")
    print(f"\n   3. Set up dbt project:")
    print(f"      dbt init uk_census_analytics")
    print(f"\n   4. Create staging model:")
    print(f"      models/staging/stg_ons_census.sql")

üè¥ UK CENSUS DATA PIPELINE
üìç Project: uk-census-portfolio
üìÇ Target: raw_data.ons_census_raw
üìã Will extract 44 dimensions for 1 area type(s)
   Total API calls: 44



‚è≥ This will take ~1-2 minutes. Continue? (y/n):  y



üì• STEP 1: EXTRACTING FROM ONS API
[1/44] UR | wpc | activity_last_week
[2/44] UR | wpc | age_arrival_uk_23a
[3/44] UR | wpc | alternative_address_indicator
[4/44] UR | wpc | country_of_birth_190a
‚ö†Ô∏è  Skipping wpc-country_of_birth_190a: Dataset too large for API
[5/44] UR | wpc | country_of_birth_60a
[6/44] UR | wpc | disability
[7/44] UR | wpc | economic_activity_status_12a
[8/44] UR | wpc | english_proficiency
[9/44] UR | wpc | ethnic_group_tb_20b
[10/44] UR | wpc | has_ever_worked
[11/44] UR | wpc | health_in_general
[12/44] UR | wpc | highest_qualification
[13/44] UR | wpc | hours_per_week_worked
[14/44] UR | wpc | industry_current_88a
[15/44] UR | wpc | industry_former_17a
[16/44] UR | wpc | is_carer
[17/44] UR | wpc | legal_partnership_status
[18/44] UR | wpc | living_arrangements_11a
[19/44] UR | wpc | main_language_23a
[20/44] UR | wpc | main_language_detailed_26a
[21/44] UR | wpc | migrant_ind
[22/44] UR | wpc | multi_passports
[23/44] UR | wpc | national_identity_all
[