In [17]:
import wbdata
import pandas as pd
import os 
from google.cloud import bigquery
import requests


In [18]:
# Path to your service account JSON (from earlier setup)
SERVICE_ACCOUNT_PATH ='data-analytics-capstone-476406-c4809696b27e.json-credentials.json'  # Update this path

In [19]:
# Authenticate the json file
client = bigquery.Client.from_service_account_json("data-analytics-capstone-476406-c4809696b27e.json")


In [20]:
# Step 1: Define parameters
# Step 2: Define parameters 
project_id = 'data-analytics-capstone-476406'
dataset_id = 'corruption_analysis'
african_countries = [
    'DZA', 'AGO', 'BEN', 'BWA', 'BFA', 'BDI', 'CPV', 'CMR', 'CAF', 'TCD',  # Algeria to Chad
    'COM', 'COD', 'COG', 'CIV', 'DJI', 'EGY', 'GNQ', 'ERI', 'SWZ', 'ETH',  # Comoros to Ethiopia
    'GAB', 'GMB', 'GHA', 'GIN', 'GNB', 'KEN', 'LSO', 'LBR', 'LBY', 'MDG',  # Gabon to Madagascar
    'MWI', 'MLI', 'MRT', 'MUS', 'MAR', 'MOZ', 'NAM', 'NER', 'NGA',         # Malawi to Nigeria
    'RWA', 'STP', 'SEN', 'SYC', 'SLE', 'SOM', 'ZAF', 'SSD', 'SDN',         # Rwanda to Sudan
    'TZA', 'TGO', 'TUN', 'UGA', 'ZMB', 'ZWE'                               # Tanzania to Zimbabwe
]
start_year = 2015
end_year = 2023

In [21]:
# Define the indicators 
indicators = {
    'NY.GDP.MKTP.KD.ZG': 'gdp_growth',  # GDP growth (annual %)
    'NY.GDP.PCAP.CD': 'gdp_per_capita',  # GDP per capita (current US$)
    'BX.KLT.DINV.CD.WD': 'fdi_inflow'  # Foreign direct investment, net inflows (BoP, current US$)
}


In [22]:
# Step 5: Fetch data from World Bank API (with batching for large country lists)
import math


all_data = []
date_range = f"{start_year}:{end_year}"
batch_size = 20  # Safe chunk size; adjust to 15-25 if needed
num_batches = math.ceil(len(african_countries) / batch_size)  # e.g., 54 / 20 = 3 batches

for indicator_code, indicator_name in indicators.items():
    print(f"\nFetching {indicator_name} ({indicator_code}) across {num_batches} batches...")
    
    for batch_num in range(num_batches):
        start_idx = batch_num * batch_size
        batch_countries = african_countries[start_idx:start_idx + batch_size]
        countries_str = ';'.join(batch_countries)
        
        url = f"https://api.worldbank.org/v2/country/{countries_str}/indicator/{indicator_code}?format=json&date={date_range}&per_page=5000"
        print(f"  Batch {batch_num + 1}/{num_batches}: {len(batch_countries)} countries (e.g., {batch_countries[:3]}...)")
        
        response = requests.get(url)
        if response.status_code != 200:
            print(f"  Error in batch {batch_num + 1}: {response.status_code} - {response.text[:200]}")
            continue
        
        try:
            api_response = response.json()
            data = api_response[1] if len(api_response) > 1 else []  # Safely grab data array
            print(f"  Batch {batch_num + 1} returned {len(data)} raw items")
        except (IndexError, ValueError) as e:
            print(f"  JSON parse error in batch {batch_num + 1}: {e}")
            continue
        
        for item in data:
            value = item.get('value')
            if value is not None:
                try:
                    value = float(value)
                    country_code = item.get('countryiso3code', 'Unknown')
                    all_data.append({
                        'country': item['country']['value'],
                        'country_code': country_code,
                        'year': int(item['date']),
                        'indicator_name': indicator_name,
                        'value': value
                    })
                except (ValueError, KeyError) as e:
                    print(f"  Skipping invalid row in batch {batch_num + 1}: {e}")
                    continue

# Rest of the code (DataFrame creation and prints) stays the same
df = pd.DataFrame(all_data)

# Safely handle the case when no data (or columns) were fetched to avoid KeyError
if df.empty:
    print("No data fetched. Please check the API responses, parameters, or network connectivity.")
else:
    total_indicators = df['indicator_name'].nunique() if 'indicator_name' in df.columns else 0
    total_countries = df['country'].nunique() if 'country' in df.columns else len(african_countries)
    print(f"\nFetched {len(df)} rows across {total_indicators} indicators for {total_countries} countries ({start_year}-{end_year}).")

    # Summary by indicator only if the necessary columns exist
    if 'indicator_name' in df.columns and 'value' in df.columns:
        print("\nSummary by indicator:")
        print(df.groupby('indicator_name')['value'].agg(['count', 'mean', 'min', 'max']).round(2))

    print("\nSample data:")
    print(df.head(10))


Fetching gdp_growth (NY.GDP.MKTP.KD.ZG) across 3 batches...
  Batch 1/3: 20 countries (e.g., ['DZA', 'AGO', 'BEN']...)
  Batch 1 returned 180 raw items
  Batch 2/3: 20 countries (e.g., ['GAB', 'GMB', 'GHA']...)
  Batch 2 returned 180 raw items
  Batch 3/3: 14 countries (e.g., ['STP', 'SEN', 'SYC']...)
  Batch 3 returned 126 raw items

Fetching gdp_per_capita (NY.GDP.PCAP.CD) across 3 batches...
  Batch 1/3: 20 countries (e.g., ['DZA', 'AGO', 'BEN']...)
  Batch 1 returned 180 raw items
  Batch 2/3: 20 countries (e.g., ['GAB', 'GMB', 'GHA']...)
  Batch 2 returned 180 raw items
  Batch 3/3: 14 countries (e.g., ['STP', 'SEN', 'SYC']...)
  Batch 3 returned 126 raw items

Fetching fdi_inflow (BX.KLT.DINV.CD.WD) across 3 batches...
  Batch 1/3: 20 countries (e.g., ['DZA', 'AGO', 'BEN']...)
  Batch 1 returned 180 raw items
  Batch 2/3: 20 countries (e.g., ['GAB', 'GMB', 'GHA']...)
  Batch 2 returned 180 raw items
  Batch 3/3: 14 countries (e.g., ['STP', 'SEN', 'SYC']...)
  Batch 3 returned 12

In [23]:
# Step 6: Load to BigQuery
table_id = f"{project_id}.{dataset_id}.economic_indicators"

job_config = bigquery.LoadJobConfig(
    write_disposition='WRITE_TRUNCATE',  # Overwrite table if it exists (or use 'WRITE_APPEND' to add rows)
    autodetect=True  # Automatically infer schema from df (e.g., value as FLOAT64)
)

print(f"\nLoading {len(df)} rows to {table_id}...")

job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result()  # Block and wait for the job to complete

if job.errors:
    print("Load errors encountered:")
    for error in job.errors:
        print(f"  - {error}")
else:
    print(f"✅ Successfully loaded {job.output_rows} rows to {table_id}.")
    
    # Fixed: Fetch the table post-load to get schema reliably
    table = client.get_table(table_id)
    print(f"   Inferred schema: {[(field.name, field.field_type) for field in table.schema]}")


Loading 1417 rows to data-analytics-capstone-476406.corruption_analysis.economic_indicators...




✅ Successfully loaded 1417 rows to data-analytics-capstone-476406.corruption_analysis.economic_indicators.
   Inferred schema: [('country', 'STRING'), ('country_code', 'STRING'), ('year', 'INTEGER'), ('indicator_name', 'STRING'), ('value', 'FLOAT')]
