# Async Patterns for AI

## Introduction

Advanced async patterns essential for production AI systems:
- **Rate limiting** - Respect API quotas
- **Semaphores** - Control concurrency
- **Timeouts** - Handle slow responses
- **Retries** - Handle transient failures
- **Circuit breakers** - Fail fast when service is down

## Learning Objectives

1. Implement rate limiting with semaphores
2. Handle timeouts gracefully
3. Build retry logic with exponential backoff
4. Track progress in long-running operations
5. Combine patterns for production-ready code

In [None]:
import asyncio
import time
from typing import List, Optional
import httpx

## 1. Rate Limiting with Semaphores

**Problem:** Making 1000 concurrent API calls overwhelms the server.

**Solution:** Use semaphores to limit concurrency.

In [None]:
async def fetch_with_semaphore(url: str, semaphore: asyncio.Semaphore) -> dict:
    """
    Fetch URL with semaphore to limit concurrency.
    
    Args:
        url: URL to fetch
        semaphore: Semaphore limiting concurrent requests
    """
    async with semaphore:  # Acquire semaphore (blocks if limit reached)
        print(f"Fetching {url[-20:]}...")
        async with httpx.AsyncClient() as client:
            response = await client.get(url, timeout=10.0)
            await asyncio.sleep(0.1)  # Simulate processing
            return response.json()

async def rate_limited_fetch_demo():
    """Demonstrate rate limiting with different concurrency levels."""
    urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 21)]
    
    # Test with different limits
    for limit in [20, 5, 2]:
        print(f"\n=== Concurrency limit: {limit} ===")
        semaphore = asyncio.Semaphore(limit)
        
        start = time.time()
        results = await asyncio.gather(
            *[fetch_with_semaphore(url, semaphore) for url in urls]
        )
        elapsed = time.time() - start
        
        print(f"Completed in {elapsed:.2f}s with {limit} concurrent requests\n")

await rate_limited_fetch_demo()

**Explanation:**
- `Semaphore(5)` allows max 5 concurrent operations
- When limit reached, other tasks wait
- Prevents overwhelming the API
- Essential for respecting rate limits

## 2. Timeouts and Graceful Degradation

In [None]:
async def fetch_with_timeout(url: str, timeout_seconds: float = 5.0) -> dict:
    """
    Fetch with timeout, return error if too slow.
    """
    try:
        async with httpx.AsyncClient() as client:
            response = await asyncio.wait_for(
                client.get(url, timeout=timeout_seconds),
                timeout=timeout_seconds
            )
            return {"success": True, "data": response.json()}
    
    except asyncio.TimeoutError:
        return {"success": False, "error": "timeout", "url": url}
    
    except Exception as e:
        return {"success": False, "error": str(e), "url": url}

# Test with various timeouts
url = "https://jsonplaceholder.typicode.com/posts/1"

for timeout in [10.0, 0.001]:  # 0.001s will definitely timeout
    print(f"\nTesting with {timeout}s timeout...")
    result = await fetch_with_timeout(url, timeout)
    if result["success"]:
        print(f"âœ“ Success: {result['data']['title'][:30]}...")
    else:
        print(f"âœ— Failed: {result['error']}")

## 3. Retry Logic with Exponential Backoff

**Pattern:** Retry failed requests with increasing delays.

In [None]:
async def fetch_with_retry(
    url: str,
    max_retries: int = 3,
    base_delay: float = 1.0
) -> dict:
    """
    Fetch with exponential backoff retry.
    
    Args:
        url: URL to fetch
        max_retries: Maximum retry attempts
        base_delay: Initial delay between retries (doubles each time)
    """
    for attempt in range(max_retries + 1):
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(url, timeout=10.0)
                response.raise_for_status()
                return {"success": True, "data": response.json(), "attempts": attempt + 1}
        
        except (httpx.HTTPError, httpx.TimeoutException) as e:
            if attempt < max_retries:
                delay = base_delay * (2 ** attempt)  # Exponential backoff
                print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
                await asyncio.sleep(delay)
            else:
                return {
                    "success": False,
                    "error": str(e),
                    "attempts": attempt + 1
                }

# Test with URL that might be flaky
result = await fetch_with_retry("https://jsonplaceholder.typicode.com/posts/1")
print(f"\nResult: {result['success']} after {result['attempts']} attempt(s)")

## 4. Progress Tracking for Long Operations

In [None]:
async def process_with_progress(items: List[str], semaphore: asyncio.Semaphore):
    """
    Process items with progress tracking.
    """
    total = len(items)
    completed = 0
    
    async def process_item(item: str):
        nonlocal completed
        async with semaphore:
            # Simulate processing
            await asyncio.sleep(0.5)
            completed += 1
            
            # Print progress every 10%
            progress = (completed / total) * 100
            if completed % max(1, total // 10) == 0 or completed == total:
                print(f"Progress: {completed}/{total} ({progress:.1f}%)")
            
            return f"Processed {item}"
    
    results = await asyncio.gather(*[process_item(item) for item in items])
    return results

# Test with 50 items, max 10 concurrent
items = [f"item-{i}" for i in range(50)]
semaphore = asyncio.Semaphore(10)

print("Processing 50 items...\n")
start = time.time()
results = await process_with_progress(items, semaphore)
elapsed = time.time() - start

print(f"\nCompleted in {elapsed:.2f}s")

## 5. Production-Ready Pattern: Combining Everything

In [None]:
class AsyncAPIClient:
    """
    Production-ready async API client with:
    - Rate limiting
    - Retry logic
    - Timeout handling
    - Progress tracking
    """
    
    def __init__(
        self,
        max_concurrent: int = 10,
        max_retries: int = 3,
        timeout: float = 30.0
    ):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.max_retries = max_retries
        self.timeout = timeout
        self.completed = 0
        self.failed = 0
    
    async def fetch(self, url: str) -> dict:
        """Fetch single URL with all protections."""
        async with self.semaphore:
            for attempt in range(self.max_retries + 1):
                try:
                    async with httpx.AsyncClient() as client:
                        response = await asyncio.wait_for(
                            client.get(url, timeout=self.timeout),
                            timeout=self.timeout
                        )
                        response.raise_for_status()
                        self.completed += 1
                        return {"success": True, "data": response.json()}
                
                except Exception as e:
                    if attempt < self.max_retries:
                        delay = 1.0 * (2 ** attempt)
                        await asyncio.sleep(delay)
                    else:
                        self.failed += 1
                        return {"success": False, "error": str(e)}
    
    async def fetch_all(self, urls: List[str]) -> List[dict]:
        """Fetch multiple URLs with progress tracking."""
        print(f"Fetching {len(urls)} URLs...")
        print(f"Settings: max_concurrent={self.semaphore._value}, "
              f"max_retries={self.max_retries}, timeout={self.timeout}s\n")
        
        start = time.time()
        results = await asyncio.gather(*[self.fetch(url) for url in urls])
        elapsed = time.time() - start
        
        print(f"\nCompleted: {self.completed} successful, {self.failed} failed")
        print(f"Time: {elapsed:.2f}s ({len(urls)/elapsed:.1f} req/s)")
        
        return results

# Test the production client
client = AsyncAPIClient(max_concurrent=5, max_retries=2, timeout=10.0)
urls = [f"https://jsonplaceholder.typicode.com/posts/{i}" for i in range(1, 21)]

results = await client.fetch_all(urls)

# Show sample results
successful = [r for r in results if r["success"]]
print(f"\nFirst successful result: {successful[0]['data']['title'][:40]}...")

## 6. Real-World OpenAI Example

Apply all patterns to OpenAI API calls.

In [None]:
import os
from openai import AsyncOpenAI

class OpenAIBatchProcessor:
    """
    Process multiple prompts with OpenAI API efficiently.
    """
    
    def __init__(self, api_key: str, max_concurrent: int = 5):
        self.client = AsyncOpenAI(api_key=api_key)
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.total_tokens = 0
    
    async def generate(self, prompt: str, max_retries: int = 3) -> dict:
        """Generate completion with retry logic."""
        async with self.semaphore:
            for attempt in range(max_retries + 1):
                try:
                    response = await self.client.chat.completions.create(
                        model="gpt-3.5-turbo",
                        messages=[{"role": "user", "content": prompt}],
                        max_tokens=100
                    )
                    
                    self.total_tokens += response.usage.total_tokens
                    
                    return {
                        "success": True,
                        "prompt": prompt,
                        "response": response.choices[0].message.content,
                        "tokens": response.usage.total_tokens
                    }
                
                except Exception as e:
                    if attempt < max_retries:
                        delay = 2.0 * (2 ** attempt)
                        print(f"Error on attempt {attempt + 1}, retrying in {delay}s...")
                        await asyncio.sleep(delay)
                    else:
                        return {
                            "success": False,
                            "prompt": prompt,
                            "error": str(e)
                        }
    
    async def batch_generate(self, prompts: List[str]) -> List[dict]:
        """Process multiple prompts concurrently."""
        print(f"Processing {len(prompts)} prompts with max {self.semaphore._value} concurrent...\n")
        
        start = time.time()
        results = await asyncio.gather(*[self.generate(p) for p in prompts])
        elapsed = time.time() - start
        
        successful = sum(1 for r in results if r["success"])
        
        print(f"\nCompleted: {successful}/{len(prompts)} successful")
        print(f"Time: {elapsed:.2f}s")
        print(f"Total tokens: {self.total_tokens}")
        print(f"Cost estimate: ${self.total_tokens * 0.0000015:.4f} (GPT-3.5)")
        
        return results

# Example usage (requires API key)
if os.getenv("OPENAI_API_KEY"):
    processor = OpenAIBatchProcessor(
        api_key=os.getenv("OPENAI_API_KEY"),
        max_concurrent=3
    )
    
    prompts = [
        "Explain async programming in one sentence.",
        "What is the benefit of concurrent requests?",
        "How does rate limiting work?",
    ]
    
    results = await processor.batch_generate(prompts)
    
    for result in results:
        if result["success"]:
            print(f"\nâœ“ {result['prompt']}")
            print(f"  â†’ {result['response'][:80]}...")
else:
    print("âš  Set OPENAI_API_KEY to run this example")

## 7. Best Practices Summary

### âœ… Always:

1. **Use semaphores** to limit concurrency (5-20 for most APIs)
2. **Set timeouts** on all operations (30s typical)
3. **Implement retry logic** with exponential backoff
4. **Track progress** for long operations
5. **Handle errors individually** (don't fail entire batch)

### ðŸŽ¯ For OpenAI API:

- **Tier limits**: Check your account tier
- **Rate limits**: 3-5 concurrent for most tiers
- **Timeout**: 30-60s for chat completions
- **Retry**: 3 attempts with 2s, 4s, 8s delays
- **Cost tracking**: Count tokens, estimate costs

### âš¡ Performance Tips:

- Connection pooling: Use single `AsyncClient` instance
- Batch size: 50-100 items per batch
- Progress: Report every 10%
- Logging: Log failures for debugging

## Summary

### Patterns Covered:

1. **Semaphores** - Control concurrency
2. **Timeouts** - Prevent hanging
3. **Retry + Backoff** - Handle transient failures
4. **Progress tracking** - Monitor long operations
5. **Production client** - Combine all patterns

### Real Impact:

These patterns enable:
- Processing 1000s of prompts reliably
- Respecting API rate limits
- Handling failures gracefully
- Building production-ready systems

### Next Steps:

- **Exercise 1**: Build concurrent OpenAI client
- **Exercise 2**: Implement rate-limited batch processor
- **Module 02-02**: Type safety with Pydantic

### Resources:

- [asyncio semaphores](https://docs.python.org/3/library/asyncio-sync.html)
- [OpenAI rate limits](https://platform.openai.com/docs/guides/rate-limits)
- [Retry strategies](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/)