Skip to content

fix(ai): wrap streaming responses in AsyncStreamWrapper to support async with#622

Open
devteamaegis wants to merge 4 commits into
PostHog:mainfrom
devteamaegis:fix/async-stream-context-manager
Open

fix(ai): wrap streaming responses in AsyncStreamWrapper to support async with#622
devteamaegis wants to merge 4 commits into
PostHog:mainfrom
devteamaegis:fix/async-stream-context-manager

Conversation

@devteamaegis
Copy link
Copy Markdown

Problem

When using the PostHog AI wrappers (posthog.ai.openai.AsyncOpenAI,
posthog.ai.anthropic.AsyncAnthropic) with libraries that expect streaming
responses to be async context managers (e.g. pydantic-ai, any code that
uses async with response:), a TypeError is raised immediately:

TypeError: 'async_generator' object does not support the
asynchronous context manager protocol

Root cause: both WrappedCompletions._create_streaming and
WrappedResponses._create_streaming (OpenAI), and
AsyncMessages._create_streaming (Anthropic), return a bare
async_generator() object. Async generators implement __aiter__ /
__anext__ but not __aenter__ / __aexit__.

The OpenAI and Anthropic SDKs return AsyncStream objects that implement
both protocols. The PostHog wrappers lost that compatibility.

Fixes #393.

Fix

Add posthog/ai/stream.py with a lightweight AsyncStreamWrapper class
that wraps an async generator and adds the missing context-manager protocol:

async with response as stream:   # now works
    async for chunk in stream:
        ...

Key behaviors:

  • __aenter__ returns self so async with response as r: async for c in r: works as expected.
  • __aexit__ calls aclose() on the underlying generator, ensuring the finally block (PostHog event capture) runs even when the caller breaks out early.
  • __getattr__ proxies attribute access to the underlying generator for provider-specific metadata.
  • Backward compatible: code using async for chunk in response: directly continues to work unchanged.

Changes

File Change
posthog/ai/stream.py New — AsyncStreamWrapper
posthog/ai/openai/openai_async.py Both _create_streaming methods return AsyncStreamWrapper(async_generator())
posthog/ai/anthropic/anthropic_async.py _create_streaming returns AsyncStreamWrapper(generator())
posthog/test/test_async_stream_wrapper.py 6 regression tests (no mocks, no external deps)

Tests

pytest posthog/test/test_async_stream_wrapper.py -v
# 6 passed

@devteamaegis devteamaegis requested a review from a team as a code owner May 22, 2026 18:56
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 22, 2026

Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
posthog/ai/openai/openai_async.py:3
`AsyncIterator` is imported but never referenced anywhere in the file — it was added in this PR but left unused.

```suggestion
from typing import Any, Dict, List, Optional
```

### Issue 2 of 2
posthog/test/test_async_stream_wrapper.py:88-106
**Prefer parameterised tests for similar cases**

`test_finally_block_runs_on_early_exit` and `test_finally_block_runs_on_full_exhaustion` share the same structure (build a generator with a `finally` side-effect, wrap it, drive it with `async with`, assert the flag). The team's preference is to express these as a single `@pytest.mark.parametrize` test so the assertion logic is written OnceAndOnlyOnce and new cases are cheap to add.

Reviews (1): Last reviewed commit: "test(ai): add regression tests for Async..." | Re-trigger Greptile

import time
import uuid
from typing import Any, Dict, List, Optional
from typing import Any, AsyncIterator, Dict, List, Optional
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 AsyncIterator is imported but never referenced anywhere in the file — it was added in this PR but left unused.

Suggested change
from typing import Any, AsyncIterator, Dict, List, Optional
from typing import Any, Dict, List, Optional
Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/ai/openai/openai_async.py
Line: 3

Comment:
`AsyncIterator` is imported but never referenced anywhere in the file — it was added in this PR but left unused.

```suggestion
from typing import Any, Dict, List, Optional
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +88 to +106
@pytest.mark.asyncio
async def test_finally_block_runs_on_full_exhaustion():
"""The underlying generator's finally block must also run on normal
exhaustion (``aclose()`` on an exhausted generator is a no-op)."""
finally_ran = []

async def gen_with_finally():
try:
yield 1
yield 2
finally:
finally_ran.append(True)

wrapper = AsyncStreamWrapper(gen_with_finally())
async with wrapper as stream:
async for _ in stream:
pass

assert finally_ran == [True]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Prefer parameterised tests for similar cases

test_finally_block_runs_on_early_exit and test_finally_block_runs_on_full_exhaustion share the same structure (build a generator with a finally side-effect, wrap it, drive it with async with, assert the flag). The team's preference is to express these as a single @pytest.mark.parametrize test so the assertion logic is written OnceAndOnlyOnce and new cases are cheap to add.

Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/test/test_async_stream_wrapper.py
Line: 88-106

Comment:
**Prefer parameterised tests for similar cases**

`test_finally_block_runs_on_early_exit` and `test_finally_block_runs_on_full_exhaustion` share the same structure (build a generator with a `finally` side-effect, wrap it, drive it with `async with`, assert the flag). The team's preference is to express these as a single `@pytest.mark.parametrize` test so the assertion logic is written OnceAndOnlyOnce and new cases are cheap to add.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@marandaneto marandaneto requested a review from a team May 23, 2026 09:28
@marandaneto
Copy link
Copy Markdown
Member

cc @PostHog/team-ai-observability

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

PostHog with pydantic-ai streaming raises TypeError: 'async_generator'

2 participants