### Indicators and Country Dictionary

In [25]:
import pandas as pd

# List of Countries
countries = ['ZAF', 'NGA', 'EGY', 'KEN', 'ETH', 'GHA', 'TZA', 'UGA', 'CIV', 'CMR']


# Dictionary of indicators with their standard names
indicator_names = {
    'NY.GDP.MKTP.CD': 'GDP (current US$)',
    'NY.GDP.PCAP.CD': 'GDP per capita (current US$)',
    'NY.GDP.MKTP.KD.ZG': 'GDP growth (annual %)',
    'FP.CPI.TOTL.ZG': 'Inflation, consumer prices (annual %)',
    'DT.DOD.DECT.CD': 'External debt stocks, total (DOD, current US$)',
    'SI.POV.NAHC': 'Poverty headcount ratio at national poverty lines (% of population)',
    'SI.POV.DDAY': 'Poverty headcount ratio at $2.15 a day (2017 PPP) (% of population)',
    'SH.DYN.MORT': 'Mortality rate, under-5 (per 1,000 live births)',
    'SH.STA.MMRT': 'Maternal mortality ratio (modeled estimate, per 100,000 live births)',
    'SH.HIV.INCD.ZS': 'Incidence of HIV (% of uninfected population ages 15-49)',
    'SH.IMM.MEAS': 'Immunization, measles (% of children ages 12-23 months)',
    'SE.PRM.ENRR': 'School enrollment, primary (% gross)',
    'SE.SEC.ENRR': 'School enrollment, secondary (% gross)',
    'SE.ADT.LITR.ZS': 'Literacy rate, adult total (% of people ages 15 and above)',
    'SG.GEN.PARL.ZS': 'Proportion of seats held by women in national parliaments (%)',
    'SL.TLF.CACT.FE.ZS': 'Labor force participation rate, female (% of female population ages 15+)',
    'SH.H2O.SMDW.ZS': 'People using safely managed drinking water services (% of population)',
    'SH.STA.SMSS.ZS': 'People using safely managed sanitation services (% of population)',
    'EG.ELC.ACCS.ZS': 'Access to electricity (% of population)',
    'EG.USE.ELEC.KH.PC': 'Electric power consumption (kWh per capita)',
    'SL.EMP.VULN.ZS': 'Vulnerable employment, total (% of total employment)',
    'SL.UEM.TOTL.ZS': 'Unemployment, total (% of total labor force)',
    'IT.NET.USER.ZS': 'Individuals using the Internet (% of population)',
    'IT.CEL.SETS.P2': 'Mobile cellular subscriptions (per 100 people)',
    'EN.ATM.CO2E.PC': 'CO2 emissions (metric tons per capita)',
    'AG.LND.FRST.ZS': 'Forest area (% of land area)',
    'AG.YLD.CREL.KG': 'Cereal yield (kg per hectare)',
    'SN.ITK.DEFC.ZS': 'Prevalence of undernourishment (% of population)',
    'FX.OWN.TOTL.ZS': 'Account ownership at a financial institution or with a mobile-money-service provider (% of population ages 15+)'
}

# Dictionary of indicators
indicators = {
    'Economic Growth': ['NY.GDP.MKTP.CD', 'NY.GDP.PCAP.CD', 'NY.GDP.MKTP.KD.ZG'],
    'Liquidity': ['FP.CPI.TOTL.ZG', 'DT.DOD.DECT.CD'],
    'Poverty and Inequality': ['SI.POV.NAHC', 'SI.POV.DDAY'],
    'Health': ['SH.DYN.MORT', 'SH.STA.MMRT', 'SH.HIV.INCD.ZS', 'SH.IMM.MEAS'],
    'Education': ['SE.PRM.ENRR', 'SE.SEC.ENRR', 'SE.ADT.LITR.ZS'],
    'Gender Equality': ['SG.GEN.PARL.ZS', 'SL.TLF.CACT.FE.ZS'],
    'Water and Sanitation': ['SH.H2O.SMDW.ZS', 'SH.STA.SMSS.ZS'],
    'Energy': ['EG.ELC.ACCS.ZS', 'EG.USE.ELEC.KH.PC'],
    'Employment and Decent Work': ['SL.EMP.VULN.ZS', 'SL.UEM.TOTL.ZS'],
    'Infrastructure and Innovation': ['IT.NET.USER.ZS', 'IT.CEL.SETS.P2'],
    'Climate Action': ['EN.ATM.CO2E.PC', 'AG.LND.FRST.ZS'],
    'Agriculture and Food Security': ['AG.YLD.CREL.KG', 'SN.ITK.DEFC.ZS'],
    'Financial Inclusion': ['FX.OWN.TOTL.ZS']
}



### Define Simple Synchronous Requests

In [None]:
import requests
import pandas as pd
from typing import List, Union
import logging

import asyncio
import aiohttp

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Constants
WORLD_BANK_URL = 'http://api.worldbank.org/v2'

def fetch_world_bank_data(indicator: str, countries: Union[str, List[str]], start_year: int = 1960, end_year: int = None) -> pd.DataFrame:
    """Fetch data for specified years and countries for a given indicator from the World Bank API."""
    if end_year is None:
        end_year = pd.Timestamp.now().year
    
    countries_str = ';'.join(countries) if isinstance(countries, list) else countries
    
    url = f"{WORLD_BANK_URL}/country/{countries_str}/indicator/{indicator}"
    params = {
        'format': 'json',
        'per_page': 10000,  
        'date': f"{start_year}:{end_year}"
    }
    
    all_data = []
    page = 1
    
    while True:
        params['page'] = page
        try:
            response = requests.get(url, params=params)
            response.raise_for_status()
            data = response.json()
            
            if not data or len(data) < 2 or not data[1]:
                break
            
            all_data.extend(data[1])
            
            if len(data[1]) < params['per_page']:
                break
            
            page += 1
        except requests.RequestException as e:
            logger.error(f"Error fetching data: {str(e)}")
            break
    
    return process_world_bank_data(all_data, indicator)

def process_world_bank_data(data: List[dict], indicator: str) -> pd.DataFrame:
    """Process the fetched World Bank data into a DataFrame."""
    if not data:
        logger.warning(f"No data retrieved for indicator: {indicator}")
        return pd.DataFrame()

    df = pd.DataFrame(data)
    
    df['country_name'] = df['country'].apply(lambda x: x['value'] if isinstance(x, dict) else x)
    df['value'] = pd.to_numeric(df['value'], errors='coerce')
    df['date'] = pd.to_datetime(df['date'], format='%Y')
    
    df = df.drop(columns=['indicator', 'obs_status', 'decimal', 'country', 'unit'])
    df = df.rename(columns={'countryiso3code': 'country_code', 'date': 'year', 'value': indicator})
    
    return df.set_index(['country_name', 'country_code', 'year']).sort_index()

def get_world_bank_data(indicator: str, countries: Union[str, List[str]], start_year: int = 1960, end_year: int = None) -> pd.DataFrame:
    """Fetch and process World Bank data for a given indicator."""
    try:
        df = fetch_world_bank_data(indicator, countries, start_year, end_year)
        logger.info(f"Successfully retrieved data for {indicator}")
        return df
    except Exception as e:
        logger.exception(f"Error retrieving data for {indicator}: {str(e)}")
        return pd.DataFrame()

In [None]:
### Test Function

In [None]:
#Example
if __name__ == "__main__":
    indicator = "NY.GDP.PCAP.CD"
    countries = ["USA", "UKR", "JPN"]
    start_year = 1960
    end_year = 2020
    df = get_world_bank_data(indicator, countries, start_year, end_year)
    print(df.head())

### Define Asynchronous Requests

In [21]:
import asyncio
import aiohttp
import nest_asyncio
import pandas as pd
from typing import List, Union
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Constants
WORLD_BANK_URL = 'http://api.worldbank.org/v2'

# Apply nest_asyncio to allow asyncio in Jupyter
nest_asyncio.apply()

async def fetch_world_bank_data_async(session: aiohttp.ClientSession, indicator: str, countries: Union[str, List[str]], start_year: int = 1960, end_year: int = None) -> pd.DataFrame:
    """Fetch data for specified years and countries for a given indicator from the World Bank API asynchronously."""
    if end_year is None:
        end_year = pd.Timestamp.now().year
    
    countries_str = ';'.join(countries) if isinstance(countries, list) else countries
    
    url = f"{WORLD_BANK_URL}/country/{countries_str}/indicator/{indicator}"
    params = {
        'format': 'json',
        'per_page': 10000,  
        'date': f"{start_year}:{end_year}"
    }
    
    all_data = []
    page = 1
    
    while True:
        params['page'] = page
        try:
            async with session.get(url, params=params) as response:
                response.raise_for_status()
                data = await response.json()
            
            if not data or len(data) < 2 or not data[1]:
                break
            
            all_data.extend(data[1])
            
            if len(data[1]) < params['per_page']:
                break
            
            page += 1
        except aiohttp.ClientError as e:
            logger.error(f"Error fetching data for {indicator}: {str(e)}")
            break
    
    return process_world_bank_data(all_data, indicator)

def process_world_bank_data(data: List[dict], indicator: str) -> pd.DataFrame:
    """Process the fetched World Bank data into a DataFrame."""
    if not data:
        logger.warning(f"No data retrieved for indicator: {indicator}")
        return pd.DataFrame()

    df = pd.DataFrame(data)
    
    df['country_name'] = df['country'].apply(lambda x: x['value'] if isinstance(x, dict) else x)
    df['value'] = pd.to_numeric(df['value'], errors='coerce')
    df['date'] = pd.to_datetime(df['date'], format='%Y')
    
    df = df.drop(columns=['indicator', 'obs_status', 'decimal', 'country', 'unit'])
    df = df.rename(columns={'countryiso3code': 'country_code', 'date': 'year', 'value': indicator})
    
    return df.set_index(['country_name', 'country_code', 'year']).sort_index()

async def get_world_bank_data_async(session: aiohttp.ClientSession, indicator: str, countries: Union[str, List[str]], start_year: int = 1960, end_year: int = None) -> pd.DataFrame:
    """Fetch and process World Bank data for a given indicator asynchronously."""
    try:
        df = await fetch_world_bank_data_async(session, indicator, countries, start_year, end_year)
        logger.info(f"Successfully retrieved data for {indicator}")
        return df
    except Exception as e:
        logger.exception(f"Error retrieving data for {indicator}: {str(e)}")
        return pd.DataFrame()

async def fetch_all_indicators(indicators: dict, countries: List[str]):
    """Fetch data for all indicators concurrently."""
    async with aiohttp.ClientSession() as session:
        tasks = []
        for category, indicator_list in indicators.items():
            for indicator in indicator_list:
                task = get_world_bank_data_async(session, indicator, countries)
                tasks.append(task)
        
        results = await asyncio.gather(*tasks)
    
    data = {}
    for indicator, df in zip(sum(indicators.values(), []), results):
        standard_name = indicator_names[indicator]
        df.columns = [standard_name]
        data[standard_name] = df
    
    return data


In [7]:
### Testing Asynchronous Function

In [22]:
# Run the asynchronous data fetching
data = asyncio.run(fetch_all_indicators(indicators, countries))

# Update the indicators dictionary to use standard names
indicators_standard = {category: [indicator_names[ind] for ind in indicator_list] 
                       for category, indicator_list in indicators.items()}

2024-08-09 17:12:03,538 - INFO - Successfully retrieved data for SE.PRM.ENRR
2024-08-09 17:12:03,660 - INFO - Successfully retrieved data for SL.EMP.VULN.ZS
2024-08-09 17:12:03,669 - INFO - Successfully retrieved data for AG.LND.FRST.ZS
2024-08-09 17:12:03,681 - INFO - Successfully retrieved data for AG.YLD.CREL.KG
2024-08-09 17:12:03,820 - INFO - Successfully retrieved data for SH.DYN.MORT
2024-08-09 17:12:03,845 - INFO - Successfully retrieved data for SE.SEC.ENRR
2024-08-09 17:12:03,852 - INFO - Successfully retrieved data for SI.POV.NAHC
2024-08-09 17:12:03,858 - INFO - Successfully retrieved data for FP.CPI.TOTL.ZG
2024-08-09 17:12:04,033 - INFO - Successfully retrieved data for SH.STA.SMSS.ZS
2024-08-09 17:12:04,040 - INFO - Successfully retrieved data for SG.GEN.PARL.ZS
2024-08-09 17:12:04,045 - INFO - Successfully retrieved data for IT.CEL.SETS.P2
2024-08-09 17:12:04,360 - INFO - Successfully retrieved data for NY.GDP.PCAP.CD
2024-08-09 17:12:04,410 - INFO - Successfully retrie

In [None]:
### Testing New Function

In [47]:
import asyncio
import aiohttp
import nest_asyncio
import pandas as pd
from typing import List, Dict
import logging
from dataclasses import dataclass

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Apply nest_asyncio to allow asyncio in Jupyter
nest_asyncio.apply()

WORLD_BANK_URL = 'http://api.worldbank.org/v2'

@dataclass
class WorldBankIndicator:
    code: str
    name: str

class WorldBankAPIError(Exception):
    pass

class DataProcessingError(Exception):
    pass

class WorldBankAPI:
    def __init__(self, base_url: str):
        self.base_url = base_url

    async def fetch_data(self, session: aiohttp.ClientSession, indicator: WorldBankIndicator, 
                         countries: List[str], start_year: int, end_year: int) -> List[dict]:
        countries_str = ';'.join(countries)
        url = f"{self.base_url}/country/{countries_str}/indicator/{indicator.code}"
        params = {
            'format': 'json',
            'per_page': 10000,
            'date': f"{start_year}:{end_year}"
        }

        all_data = []
        page = 1

        while True:
            params['page'] = page
            try:
                async with session.get(url, params=params) as response:
                    response.raise_for_status()
                    data = await response.json()

                if not data or len(data) < 2 or not data[1]:
                    break

                all_data.extend(data[1])

                if len(data[1]) < params['per_page']:
                    break

                page += 1
            except aiohttp.ClientError as e:
                raise WorldBankAPIError(f"Error fetching data for {indicator.code}: {str(e)}")

        return all_data

class DataProcessor:
    @staticmethod
    def process_world_bank_data(data: List[dict], indicator: WorldBankIndicator) -> pd.DataFrame:
        if not data:
            logger.warning(f"No data retrieved for indicator: {indicator.code}")
            return pd.DataFrame()

        df = pd.DataFrame(data)

        df['country_name'] = df['country'].apply(lambda x: x['value'] if isinstance(x, dict) else x)
        df['value'] = pd.to_numeric(df['value'], errors='coerce')
        df['date'] = pd.to_datetime(df['date'], format='%Y')

        df = df.drop(columns=['indicator', 'obs_status', 'decimal', 'country', 'unit'])
        df = df.rename(columns={'countryiso3code': 'country_code', 'date': 'year', 'value': indicator.name})

        return df.set_index(['country_name', 'country_code', 'year']).sort_index()

class WorldBankDataPipeline:
    def __init__(self, api: WorldBankAPI, processor: DataProcessor):
        self.api = api
        self.processor = processor

    async def fetch_and_process_indicator(self, session: aiohttp.ClientSession, 
                                          indicator: WorldBankIndicator, 
                                          countries: List[str], 
                                          start_year: int, end_year: int) -> pd.DataFrame:
        try:
            raw_data = await self.api.fetch_data(session, indicator, countries, start_year, end_year)
            df = self.processor.process_world_bank_data(raw_data, indicator)
            logger.info(f"Successfully retrieved and processed data for {indicator.code}")
            return df
        except (WorldBankAPIError, DataProcessingError) as e:
            logger.exception(f"Error processing data for {indicator.code}: {str(e)}")
            return pd.DataFrame()

    async def fetch_all_indicators(self, indicators: List[WorldBankIndicator], 
                                   countries: List[str], start_year: int, end_year: int) -> Dict[str, pd.DataFrame]:
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_and_process_indicator(session, indicator, countries, start_year, end_year) 
                     for indicator in indicators]
            results = await asyncio.gather(*tasks)

        return {indicator.name: df for indicator, df in zip(indicators, results)}

async def get_world_bank_data(indicator_codes: List[str], countries: List[str], 
                              start_year: int = 1960, end_year: int = None) -> Dict[str, pd.DataFrame]:
    """
    Fetch and process World Bank data for given indicator codes and countries.
    
    :param indicator_codes: List of World Bank indicator codes
    :param countries: List of country codes or names
    :param start_year: Start year for data retrieval (default: 1960)
    :param end_year: End year for data retrieval (default: current year)
    :return: Dictionary of DataFrames, keyed by indicator name
    """
    if end_year is None:
        end_year = pd.Timestamp.now().year

    api = WorldBankAPI(WORLD_BANK_URL)
    processor = DataProcessor()
    pipeline = WorldBankDataPipeline(api, processor)

    # For simplicity, we're using the indicator code as the name.
    # In a real-world scenario, you might want to fetch the actual indicator names from the World Bank API.
    indicators = [WorldBankIndicator(code, code) for code in indicator_codes]

    results = await pipeline.fetch_all_indicators(indicators, countries, start_year, end_year)
    return results


In [48]:
# Example usage
async def main():
    indicator_codes = ['NY.GDP.MKTP.CD', 'SP.POP.TOTL']  # GDP and Population
    countries = ['USA', 'CHN', 'JPN']  # United States, China, Japan
    data = await get_world_bank_data(indicator_codes, countries)
    
    for indicator, df in data.items():
        print(f"\nData for {indicator}:")
        print(df.head())

if __name__ == "__main__":
    asyncio.run(main())

2024-08-13 14:18:12,639 - INFO - Successfully retrieved and processed data for SP.POP.TOTL
2024-08-13 14:18:12,650 - INFO - Successfully retrieved and processed data for NY.GDP.MKTP.CD



Data for NY.GDP.MKTP.CD:
                                      NY.GDP.MKTP.CD
country_name country_code year                      
China        CHN          1960-01-01    5.971625e+10
                          1961-01-01    5.005669e+10
                          1962-01-01    4.720919e+10
                          1963-01-01    5.070661e+10
                          1964-01-01    5.970813e+10

Data for SP.POP.TOTL:
                                      SP.POP.TOTL
country_name country_code year                   
China        CHN          1960-01-01    667070000
                          1961-01-01    660330000
                          1962-01-01    665770000
                          1963-01-01    682335000
                          1964-01-01    698355000
