In [1]:
from get_users import get_users, get_user_scores
from db_api import (
    put_users,
    upsert_user,
    upsert_users,
    upsert_scores,
    fetch_user_ids,
    find_offset,
)
import time
import httpx
import asyncio


In [None]:
def batch_id_generator(start=0, batch_size=100, num_batches=5):
    """
    Yields lists of IDs starting from 0.
    """
    current_start = start
    for _ in range(num_batches):
        yield list(range(current_start, current_start + batch_size))

        current_start += batch_size


async def get_users_sem(sem, client, user_ids):
    async with sem:
        if user_ids[-1] % 1000 == 999:
            print(f"Scheduling up to user ID: {user_ids[-1]}")
        try:
            scores = await get_users(user_ids, client)
            await asyncio.to_thread(upsert_users, scores)
        except Exception as e:
            print(e)
            pass


sem = asyncio.Semaphore(1)  # Limit to 10 concurrent requests
timeout = httpx.Timeout(10.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as httpx_client:
    for user_ids in batch_id_generator(
        start=505000, batch_size=100, num_batches=300000
    ):
        tasks = [get_users_sem(sem, httpx_client, user_ids)]

        await asyncio.gather(*tasks)

CancelledError: 

In [2]:
async def process_user(sem, client, user_id):
    """
    Wrapper to handle semaphore and exceptions for a single user.
    """
    async with sem:
        try:
            scores = await get_user_scores(
                user_id, client, limit=200, type="best", offset=0
            )

            await asyncio.to_thread(upsert_scores, scores)

        except Exception as e:
            print(f"Error processing user ID {user_id}: {e}")


sem = asyncio.Semaphore(15)
timeout = httpx.Timeout(10.0, connect=5.0)
async with httpx.AsyncClient(timeout=timeout) as httpx_client:
    for user_ids in fetch_user_ids(offset=find_offset(54800)):
        print(f"Processing batch: {user_ids[0]} to {user_ids[-1]}")

        # Create a task for each user in this batch
        tasks = [process_user(sem, httpx_client, uid) for uid in user_ids]

        # Run them all concurrently and wait for completion
        await asyncio.gather(*tasks)


Processing batch: 54213 to 54800


CancelledError: 