In [None]:
import asyncio
import pandas as pd
from pprint import pprint as print
from tqdm.asyncio import tqdm_asyncio

from dfpp.publishing import publish_series
from dfpp.transformation.geo_utils import get_iso3_to_official_name_map
from dfpp.sources.world_bank.retrieve import Connector
from dfpp.sources.world_bank.transform import transform_series

In [None]:
MAX_CONCURRENCY = 5
MAX_TIMEOUT = 60

In [None]:
ISO_3_MAP = await get_iso3_to_official_name_map()
conn = Connector(connections=MAX_CONCURRENCY, timeout=MAX_TIMEOUT)
df_indicators = await conn.get_indicators()

In [None]:
async def process_indicator(
    conn: Connector, indicator_id: str, semaphore: asyncio.Semaphore
) -> None | tuple[str, str]:
    try:
        async with semaphore:
            df_series = await conn.get_series(indicator_id)
            assert (
                not df_series.empty or df_series.shape[0] > 0
            ), "empty series prior to transform (archived indicator)"
            df = df_series.copy()
            df = transform_series(df, ISO_3_MAP)
            assert not df.empty or df.shape[0] > 0, "empty series after transform"
            await publish_series(indicator_id, df, source_folder="world_bank")
            return df
    except Exception as e:
        return indicator_id, str(e)


async def process_all_indicators(df_indicators: pd.DataFrame) -> None:
    failed_indicators = []
    indicators = df_indicators.to_dict(orient="records")
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    tasks = [
        process_indicator(conn, indicator["id"], semaphore) for indicator in indicators
    ]
    for future in tqdm_asyncio.as_completed(tasks):
        result = await future
        if isinstance(result, tuple) and len(result) == 2:
            print(result)
            failed_indicators.append(result)
    return failed_indicators

In [None]:
failed_indicators = await process_all_indicators(df_indicators)

In [None]:
assert len(failed_indicators) == 0, print(failed_indicators)