In [1]:
from aiohttp import web


async def h(r: web.Request) -> web.Response:
    return web.Response(text=f"{r.method} {r.path}")


app = web.Application()
app.router.add_route("*", "/{p:.*}", h)

# Non-blocking async runner
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "localhost", 8888)
await site.start()
print("Server running on http://localhost:8888")
# Server is now running in the background!
# To stop later: await runner.cleanup()

Server running on http://localhost:8888


In [2]:
# Test the callback by making HTTP requests
import httpx

async with httpx.AsyncClient() as client:
    # Make multiple requests to see callbacks in action
    print("Making test requests...")

    r1 = await client.get("http://localhost:8888/")
    print(f"Response 1: {r1.text}")

    r2 = await client.post("http://localhost:8888/test", json={"data": "test"})
    print(f"Response 2: {r2.text}")

    r3 = await client.get("http://localhost:8888/path/to/resource")
    print(f"Response 3: {r3.text}")

Making test requests...
Response 1: GET /
Response 2: POST /test
Response 3: GET /path/to/resource


In [3]:
import httpx._models

original_close = httpx._models.Response.close
original_aclose = httpx._models.Response.aclose


def patched_close(self: httpx._models.Response) -> None:
    original_close(self)
    print(f"Response closed: {self.status_code} {self.reason_phrase}")


async def patched_aclose(self: httpx._models.Response) -> None:
    await original_aclose(self)
    print(f"Response aclosed: {self.status_code} {self.reason_phrase}")


httpx._models.Response.close = patched_close
httpx._models.Response.aclose = patched_aclose


async with httpx.AsyncClient() as client:
    r = await client.get("http://localhost:8888/")
    r.content

Response aclosed: 200 OK


In [4]:
import functools
from typing import Any, Callable, List

import httpx
import httpx._models

# Storage for response callbacks
response_callbacks: List[Callable] = []


def add_response_callback(callback: Callable) -> None:
    """Add a callback to be called when a Response is created."""
    response_callbacks.append(callback)


def clear_response_callbacks() -> None:
    """Clear all response callbacks."""
    response_callbacks.clear()


# Store the original Response.__init__
original_response_init = httpx._models.Response.__init__


@functools.wraps(original_response_init)
def patched_response_init(self, *args, **kwargs):
    """Patched Response.__init__ that calls callbacks."""
    # Call the original init first
    original_response_init(self, *args, **kwargs)

    # Call all registered callbacks
    for callback in response_callbacks:
        try:
            callback(self)
        except Exception as e:
            print(f"Error in response callback: {e}")


# Apply the patch
httpx._models.Response.__init__ = patched_response_init
print("✅ httpx Response class patched with callback support")

✅ httpx Response class patched with callback support


In [5]:
# Example: Create a callback to track all responses
from datetime import datetime

response_log = []


def log_response(response):
    """Callback that logs response details."""
    log_entry = {
        "timestamp": datetime.now().isoformat(),
        "status_code": response.status_code,
        "url": str(response.url)
        if hasattr(response, "_request") and response._request
        else "N/A",
        "headers": dict(response.headers),
        "http_version": response.http_version,
        "reason_phrase": response.reason_phrase,
    }
    response_log.append(log_entry)
    print(f"📊 Response captured: {response.status_code} {response.reason_phrase}")


# Register the callback
add_response_callback(log_response)
print("Callback registered - will log all HTTP responses")

Callback registered - will log all HTTP responses


In [6]:
# Display the captured response log
import json

print(f"\n📋 Total responses captured: {len(response_log)}")
print("-" * 60)

for i, entry in enumerate(response_log, 1):
    print(f"\n📌 Response #{i}:")
    print(f"  Timestamp: {entry['timestamp']}")
    print(f"  Status: {entry['status_code']} {entry['reason_phrase']}")
    print(f"  URL: {entry['url']}")
    print(f"  HTTP Version: {entry['http_version']}")
    print(f"  Headers (sample): {dict(list(entry['headers'].items())[:3])}...")

# You can also get more detailed info
print(f"\n🔍 Full log available in 'response_log' variable")
print(
    f"Example: response_log[0] = {json.dumps(response_log[0] if response_log else {}, indent=2)[:200]}..."
)


📋 Total responses captured: 0
------------------------------------------------------------

🔍 Full log available in 'response_log' variable
Example: response_log[0] = {}...


In [7]:
# Advanced example: Multiple callbacks with different purposes
import time
from collections import defaultdict

# Callback 1: Count responses by status code
status_counter = defaultdict(int)


def count_status_codes(response):
    status_counter[response.status_code] += 1


# Callback 2: Track response times
response_times = []
start_times = {}


def track_response_time(response):
    # Note: This is a simplified example. In real scenarios,
    # you'd track request start time differently
    response_times.append(
        {
            "url": str(response.url)
            if hasattr(response, "_request") and response._request
            else "N/A",
            "status": response.status_code,
            "timestamp": time.time(),
        }
    )


# Callback 3: Alert on errors
def alert_on_errors(response):
    if response.status_code >= 400:
        print(
            f"⚠️ ERROR RESPONSE: {response.status_code} for {response.url if hasattr(response, '_request') and response._request else 'unknown URL'}"
        )


# Register all callbacks
add_response_callback(count_status_codes)
add_response_callback(track_response_time)
add_response_callback(alert_on_errors)

print("✅ Advanced callbacks registered!")
print(f"Total active callbacks: {len(response_callbacks)}")

✅ Advanced callbacks registered!
Total active callbacks: 4


In [8]:
# Test advanced callbacks and demonstrate cleanup
async with httpx.AsyncClient() as client:
    print("Testing with all callbacks active...")

    # Make requests including one that will cause an error (404)
    await client.get("http://localhost:8888/")
    await client.get("http://localhost:8888/api/users")
    await client.post("http://localhost:8888/api/data", json={"test": "data"})

    # This will trigger a 404 (path doesn't exist on most servers)
    # But our test server responds to all paths, so let's simulate an error scenario
    # by making a request to a non-existent server
    try:
        await client.get("http://localhost:9999/", timeout=1.0)
    except Exception as e:
        print(f"Expected error: {e}")

print(f"\n📊 Status code counts: {dict(status_counter)}")
print(f"📈 Total responses tracked: {len(response_times)}")

# Demonstrate cleanup
print(f"\n🧹 Cleaning up...")
print(f"Before cleanup: {len(response_callbacks)} callbacks")
clear_response_callbacks()
print(f"After cleanup: {len(response_callbacks)} callbacks")

# You can also restore the original Response.__init__ if needed
# httpx._models.Response.__init__ = original_response_init
# print("✅ Original Response.__init__ restored")

Testing with all callbacks active...
📊 Response captured: 200 OK
Response aclosed: 200 OK
📊 Response captured: 200 OK
Response aclosed: 200 OK
📊 Response captured: 200 OK
Response aclosed: 200 OK
Expected error: All connection attempts failed

📊 Status code counts: {200: 3}
📈 Total responses tracked: 3

🧹 Cleaning up...
Before cleanup: 4 callbacks
After cleanup: 0 callbacks


In [9]:
# Optional: Restore original behavior completely
def restore_original_response():
    """Restore the original Response.__init__ method."""
    httpx._models.Response.__init__ = original_response_init
    clear_response_callbacks()
    print("✅ Original httpx Response behavior restored")
    print("   - Original __init__ method restored")
    print("   - All callbacks cleared")


# Uncomment to restore:
# restore_original_response()

# Or keep the patched version for observability
print("💡 Patch remains active. Call restore_original_response() to revert.")

💡 Patch remains active. Call restore_original_response() to revert.
