AsyncGraphs is a tiny ETL framework that leverages asyncio to make the execution concurrent whilst blocked on I/O.
- Typed
- Simple concurrency based on asyncio
- Easy construction of ETL graphs
pip install asyncgraphs
The following example prints random Pokémon and the games they appear in.
It does this every 10 seconds and uses PokéApi.
import aiohttp
from asyncgraphs import Graph, run
import asyncio
from functools import partial
from random import randint
from typing import Dict, Any
async def random_pokemon_id():
while True:
yield randint(1, 151)
await asyncio.sleep(10)
async def get_pokemon_info(session: aiohttp.ClientSession, pokemon_id: int) -> Dict[str, Any]:
pokemon_url = f"{pokemon_id}"
async with session.get(pokemon_url) as response:
yield await response.json()
def format_pokemon(pokemon_info: Dict[str, Any]) -> str:
name = pokemon_info["name"]
versions = (game['version']['name'] for game in pokemon_info['game_indices'])
return f"{name}: {', '.join(versions)}"
async def main():
async with aiohttp.ClientSession() as session:
g = Graph()
g | random_pokemon_id() | partial(get_pokemon_info, session) | format_pokemon | print
await run(g)
This library is not yet stable. I'm still figuring out my preferred way of handling things.
pyenv install 3.12
pyenv local 3.12
pyenv virtualenv asyncgraphs
pyenv activate asyncgraphs
pip install .[dev,docs]