Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

## 2.2.0 (unreleased)

### Added

- **True streaming API** - New `py:stream_start/3,4` and `py:stream_cancel/1` functions
for event-driven streaming from Python generators. Unlike `py:stream/3,4` which
collects all values at once, `stream_start` sends `{py_stream, Ref, {data, Value}}`
messages as values are yielded. Supports both sync and async generators. Useful for
LLM token streaming, real-time data feeds, and processing large sequences incrementally.

### Fixed

- **Channel notification for create_task** - Fixed async channel receive hanging when using
Expand Down
156 changes: 105 additions & 51 deletions docs/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,83 @@ This guide covers working with Python generators from Erlang.

Python generators allow processing large datasets or infinite sequences
efficiently by yielding values one at a time. erlang_python supports
streaming these values back to Erlang.
two modes of streaming:

## Generator Expressions
1. **Batch streaming** (`py:stream/3,4`, `py:stream_eval/1,2`) - Collects all values into a list
2. **True streaming** (`py:stream_start/3,4`) - Sends events as values are yielded

The simplest way to stream is with generator expressions:
## True Streaming (Event-driven)

For real-time processing where you need values as they arrive (e.g., LLM tokens,
live data feeds), use `py:stream_start/3,4`:

```erlang
%% Start streaming from a Python iterator
{ok, Ref} = py:stream_start(builtins, iter, [[1,2,3,4,5]]),

%% Receive events as values are yielded
receive_loop(Ref).

receive_loop(Ref) ->
receive
{py_stream, Ref, {data, Value}} ->
io:format("Got: ~p~n", [Value]),
receive_loop(Ref);
{py_stream, Ref, done} ->
io:format("Complete~n");
{py_stream, Ref, {error, Reason}} ->
io:format("Error: ~p~n", [Reason])
after 30000 ->
timeout
end.
```

### Events

The stream sends these messages to the owner process:

- `{py_stream, Ref, {data, Value}}` - Each yielded value
- `{py_stream, Ref, done}` - Stream completed successfully
- `{py_stream, Ref, {error, Reason}}` - Stream error

### Options

```erlang
%% Send events to a different process
{ok, Ref} = py:stream_start(Module, Func, Args, #{owner => OtherPid}).
```

### Cancellation

Cancel an active stream:

```erlang
{ok, Ref} = py:stream_start(my_module, long_generator, []),
%% ... receive some values ...
ok = py:stream_cancel(Ref).
%% Stream will stop on next iteration
```

### Async Generators

`stream_start` supports both sync and async generators:

```erlang
%% Async generator (e.g., streaming from an async API)
ok = py:exec(<<"
async def async_gen():
for i in range(5):
await asyncio.sleep(0.1)
yield i
">>),
{ok, Ref} = py:stream_start('__main__', async_gen, []).
```

## Batch Streaming (Collecting All Values)

For simpler use cases where you want all values at once:

### Generator Expressions

```erlang
%% Stream squares of numbers 0-9
Expand All @@ -26,7 +98,7 @@ The simplest way to stream is with generator expressions:
%% Evens = [0,2,4,6,8,10,12,14,16,18]
```

## Iterator Objects
### Iterator Objects

Any Python iterator can be streamed:

Expand All @@ -40,7 +112,7 @@ Any Python iterator can be streamed:
%% Items = [{<<"a">>, 1}, {<<"b">>, 2}]
```

## Generator Functions
### Generator Functions

Define generator functions with `yield`:

Expand Down Expand Up @@ -69,46 +141,44 @@ For reliable inline generators, use lambda with walrus operator (Python 3.8+):
%% Fib = [0,1,1,2,3,5,8,13,21,34]
```

## Streaming Protocol
## When to Use Each Mode

Internally, streaming uses these messages:
| Use Case | Recommended API |
|----------|-----------------|
| LLM token streaming | `stream_start/3,4` |
| Real-time data feeds | `stream_start/3,4` |
| Live progress updates | `stream_start/3,4` |
| Batch processing | `stream/3,4` or `stream_eval/1,2` |
| Small datasets | `stream/3,4` or `stream_eval/1,2` |
| One-time collection | `stream/3,4` or `stream_eval/1,2` |

```erlang
{py_chunk, Ref, Value} %% Each yielded value
{py_end, Ref} %% Generator exhausted
{py_error, Ref, Error} %% Exception occurred
```
## Memory Considerations

You can build custom streaming consumers:
- `stream_start`: Low memory - values processed as they arrive
- `stream/stream_eval`: Values collected into a list - memory grows with output size
- Generators are garbage collected after exhaustion

## Use Cases

### LLM Token Streaming

```erlang
start_stream(Code) ->
Ref = make_ref(),
py_pool:request({stream_eval, Ref, self(), Code, #{}}),
process_stream(Ref).
%% Stream tokens from an LLM
{ok, Ref} = py:stream_start(llm_client, generate_tokens, [Prompt]),
stream_to_client(Ref, WebSocket).

process_stream(Ref) ->
stream_to_client(Ref, WS) ->
receive
{py_chunk, Ref, Value} ->
io:format("Got: ~p~n", [Value]),
process_stream(Ref);
{py_end, Ref} ->
io:format("Done~n");
{py_error, Ref, Error} ->
io:format("Error: ~p~n", [Error])
after 30000 ->
io:format("Timeout~n")
{py_stream, Ref, {data, Token}} ->
websocket:send(WS, Token),
stream_to_client(Ref, WS);
{py_stream, Ref, done} ->
websocket:send(WS, <<"[DONE]">>);
{py_stream, Ref, {error, _}} ->
websocket:send(WS, <<"[ERROR]">>)
end.
```

## Memory Considerations

- Values are collected into a list by `stream_eval/1,2`
- For large datasets, consider processing chunks as they arrive
- Generators are garbage collected after exhaustion

## Use Cases

### Data Processing Pipelines

```erlang
Expand All @@ -119,22 +189,6 @@ process_stream(Ref) ->
Results = [process_line(L) || L <- Lines].
```

### Infinite Sequences

```erlang
%% Define infinite counter
ok = py:exec(<<"
def counter():
n = 0
while True:
yield n
n += 1
">>).

%% Take first 100 (use your own take function)
%% Can't use stream/3 directly for infinite - need custom handling
```

### Batch Processing

```erlang
Expand Down
Loading
Loading