In [1]:
import os
import time
import asyncio
from logging import Logger
from functools import lru_cache, wraps
from dotenv import load_dotenv, find_dotenv
from typing import Callable, Any, Optional, Type, List

import httpx
import pandas as pd

load_dotenv(find_dotenv())

True

In [2]:
def create_client() -> httpx.AsyncClient:
    """
    Creates a common client for future http requests

    Returns:
        httpx.Client: client with ford proxies
    """
    #    limits = httpx.Limits(max_connections=8)  # test with 8 latter, default 4
    proxy_mounts = {
        "http://": str(os.getenv("FORD_PROXY")),
        "https://": str(os.getenv("FORD_PROXY")),
    }
    print("New client ready to use")
    return httpx.AsyncClient(http2=True, verify=False, proxies=proxy_mounts)  # type: ignore


async def get_token() -> str:
    async with create_client() as client:
        response = await client.post(
            str(os.getenv("TOKEN_ENDPOINT")),
            data={
                "client_id": str(os.getenv("CLIENT_ID")),
                "client_secret": str(os.getenv("CLIENT_SECRET")),
                "scope": str(os.getenv("SCOPE")),
                "grant_type": "client_credentials",
            },
            timeout=160,
        )
    print(
        "token expires in:",
        round(int(response.json()["expires_in"]) / 60, 0),
        "minutes",
    )
    return response.json()["access_token"]

In [3]:
def retry(
    exceptions: List[Type[Exception]],
    tries: int = 4,
    delay: int = 3,
    backoff: int = 2,
    logger: Optional[Logger] = None,
) -> Callable:
    def deco_retry(function: Callable) -> Callable:
        @wraps(function)
        def f_retry(*args: Any, **kwargs: Any) -> Any:
            mtries, mdelay = tries, delay
            while mtries > 1:
                try:
                    return function(*args, **kwargs)
                except exceptions as exc:
                    msg = f"{str(exc)}, Retrying in {mdelay} seconds..."
                    if logger:
                        logger.warning(msg)
                    else:
                        print(msg)
                    time.sleep(mdelay)
                    mtries -= 1
                    mdelay *= backoff

            return function(*args, **kwargs)

        return f_retry  # true decorator

    return deco_retry


def rate_limiter(max_calls: int, sleep_time: int):
    def decorator(func: Callable) -> Callable:
        # This is the counter to keep track of the number of calls
        calls_count = {"count": 0}

        @wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            # Increment the call count
            calls_count["count"] += 1

            # Call the actual function
            result = func(*args, **kwargs)

            # If the call count reaches the max_calls, reset the counter and sleep
            if calls_count["count"] >= max_calls:
                time.sleep(sleep_time)
                calls_count["count"] = 0

            return result

        return wrapper

    return decorator

In [4]:
@retry([Exception])
@lru_cache(maxsize=70)
async def call_api_movement(
    complaint: str, client: httpx.AsyncClient, token: str, semaphore: asyncio.Semaphore
) -> str:
    prompt = f"{complaint} resume this complaint"
    content = {
        "model": "gpt-4",
        "context": (
            "You are a helpful text reader and analyzer."
        ),  # sets the overall behavior of the assistant.
        "messages": [{"role": "user", "content": prompt}],
        "parameters": {
            "temperature": 0.05,  # Determines the randomnes of the model's response.
        },
    }
    async with semaphore:
        response = await client.post(
            str(os.getenv("API_ENDPOINT")),
            headers={"Authorization": f"Bearer {token}"},
            json=content,
        )
    response = response.json()["content"]
    return response

In [5]:
async def main(max_requests: int = 70) -> pd.DataFrame:
    request_count = 0
    requests_runs = 1
    responses = []

    token = await get_token()
    df = pd.read_csv("../../../../data/raw/mock_dataset.csv")
    print(f"Dataframe readed, size of {df.shape[0]}")

    # Splitting DataFrame into blocks of 700 rows (What my humble client can do)
    num_blocks = len(df) // 700 + (1 if len(df) % 700 else 0)

    for block in range(num_blocks):
        start_idx = block * 700
        end_idx = min((block + 1) * 700, len(df))

        block_df = df.iloc[start_idx:end_idx]
        print(f"Block: {block + 1} of {num_blocks}")

        async with create_client() as client:
            tasks = []
            semaphore = asyncio.Semaphore(max_requests)
            for complaint in block_df["CDESCR"]:
                tasks.append(
                    asyncio.ensure_future(
                        call_api_movement(complaint, client, token, semaphore)
                    )
                )
                request_count += 1
                if request_count >= max_requests:
                    print("\tRun:", requests_runs, end="... ")
                    block_responses = await asyncio.gather(*tasks)
                    responses.extend(block_responses)
                    print("all responses aquired")

                    await asyncio.sleep(2.5)
                    tasks = []
                    semaphore = asyncio.Semaphore(70)
                    request_count = 0
                    requests_runs += 1

        await asyncio.sleep(5)

    df["response"] = responses
    return df


df = await main()
df.head()

New client ready to use
token expires in: 60.0 minutes
Dataframe readed, size of 1293
Block: 1 of 2
New client ready to use
	Run: 1... 

ProxyError: 503 Service Unavailable