From 42e99af054600ff61d7c2f0f7029d6568d8ddb18 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Mon, 23 Mar 2026 15:11:40 +0100 Subject: [PATCH] Add true streaming API with stream_start/3,4 and stream_cancel/1 New event-driven streaming functions that send {py_stream, Ref, Event} messages as values are yielded from Python generators: - py:stream_start/3,4 - Start streaming, returns immediately with ref - py:stream_cancel/1 - Cancel an active stream Events sent to owner process: - {py_stream, Ref, {data, Value}} - Each yielded value - {py_stream, Ref, done} - Stream completed - {py_stream, Ref, {error, Reason}} - Stream error Unlike py:stream/3,4 which collects all values at once, stream_start sends events incrementally. Useful for LLM token streaming, real-time data feeds, and processing large sequences without memory accumulation. Also fixes inaccurate comment in py_channel.erl about Python API. --- CHANGELOG.md | 8 ++ docs/streaming.md | 156 +++++++++++++++++++++++++------------ src/py.erl | 152 +++++++++++++++++++++++++++++++++++- src/py_channel.erl | 4 +- src/py_state.erl | 73 ++++++++++++++++++ test/py_stream_SUITE.erl | 163 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 500 insertions(+), 56 deletions(-) create mode 100644 test/py_stream_SUITE.erl diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f58709..af05074 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ ## 2.2.0 (unreleased) +### Added + +- **True streaming API** - New `py:stream_start/3,4` and `py:stream_cancel/1` functions + for event-driven streaming from Python generators. Unlike `py:stream/3,4` which + collects all values at once, `stream_start` sends `{py_stream, Ref, {data, Value}}` + messages as values are yielded. Supports both sync and async generators. Useful for + LLM token streaming, real-time data feeds, and processing large sequences incrementally. + ### Fixed - **Channel notification for create_task** - Fixed async channel receive hanging when using diff --git a/docs/streaming.md b/docs/streaming.md index b1c554a..94c2075 100644 --- a/docs/streaming.md +++ b/docs/streaming.md @@ -6,11 +6,83 @@ This guide covers working with Python generators from Erlang. Python generators allow processing large datasets or infinite sequences efficiently by yielding values one at a time. erlang_python supports -streaming these values back to Erlang. +two modes of streaming: -## Generator Expressions +1. **Batch streaming** (`py:stream/3,4`, `py:stream_eval/1,2`) - Collects all values into a list +2. **True streaming** (`py:stream_start/3,4`) - Sends events as values are yielded -The simplest way to stream is with generator expressions: +## True Streaming (Event-driven) + +For real-time processing where you need values as they arrive (e.g., LLM tokens, +live data feeds), use `py:stream_start/3,4`: + +```erlang +%% Start streaming from a Python iterator +{ok, Ref} = py:stream_start(builtins, iter, [[1,2,3,4,5]]), + +%% Receive events as values are yielded +receive_loop(Ref). + +receive_loop(Ref) -> + receive + {py_stream, Ref, {data, Value}} -> + io:format("Got: ~p~n", [Value]), + receive_loop(Ref); + {py_stream, Ref, done} -> + io:format("Complete~n"); + {py_stream, Ref, {error, Reason}} -> + io:format("Error: ~p~n", [Reason]) + after 30000 -> + timeout + end. +``` + +### Events + +The stream sends these messages to the owner process: + +- `{py_stream, Ref, {data, Value}}` - Each yielded value +- `{py_stream, Ref, done}` - Stream completed successfully +- `{py_stream, Ref, {error, Reason}}` - Stream error + +### Options + +```erlang +%% Send events to a different process +{ok, Ref} = py:stream_start(Module, Func, Args, #{owner => OtherPid}). +``` + +### Cancellation + +Cancel an active stream: + +```erlang +{ok, Ref} = py:stream_start(my_module, long_generator, []), +%% ... receive some values ... +ok = py:stream_cancel(Ref). +%% Stream will stop on next iteration +``` + +### Async Generators + +`stream_start` supports both sync and async generators: + +```erlang +%% Async generator (e.g., streaming from an async API) +ok = py:exec(<<" +async def async_gen(): + for i in range(5): + await asyncio.sleep(0.1) + yield i +">>), +{ok, Ref} = py:stream_start('__main__', async_gen, []). +``` + +## Batch Streaming (Collecting All Values) + +For simpler use cases where you want all values at once: + +### Generator Expressions ```erlang %% Stream squares of numbers 0-9 @@ -26,7 +98,7 @@ The simplest way to stream is with generator expressions: %% Evens = [0,2,4,6,8,10,12,14,16,18] ``` -## Iterator Objects +### Iterator Objects Any Python iterator can be streamed: @@ -40,7 +112,7 @@ Any Python iterator can be streamed: %% Items = [{<<"a">>, 1}, {<<"b">>, 2}] ``` -## Generator Functions +### Generator Functions Define generator functions with `yield`: @@ -69,46 +141,44 @@ For reliable inline generators, use lambda with walrus operator (Python 3.8+): %% Fib = [0,1,1,2,3,5,8,13,21,34] ``` -## Streaming Protocol +## When to Use Each Mode -Internally, streaming uses these messages: +| Use Case | Recommended API | +|----------|-----------------| +| LLM token streaming | `stream_start/3,4` | +| Real-time data feeds | `stream_start/3,4` | +| Live progress updates | `stream_start/3,4` | +| Batch processing | `stream/3,4` or `stream_eval/1,2` | +| Small datasets | `stream/3,4` or `stream_eval/1,2` | +| One-time collection | `stream/3,4` or `stream_eval/1,2` | -```erlang -{py_chunk, Ref, Value} %% Each yielded value -{py_end, Ref} %% Generator exhausted -{py_error, Ref, Error} %% Exception occurred -``` +## Memory Considerations -You can build custom streaming consumers: +- `stream_start`: Low memory - values processed as they arrive +- `stream/stream_eval`: Values collected into a list - memory grows with output size +- Generators are garbage collected after exhaustion + +## Use Cases + +### LLM Token Streaming ```erlang -start_stream(Code) -> - Ref = make_ref(), - py_pool:request({stream_eval, Ref, self(), Code, #{}}), - process_stream(Ref). +%% Stream tokens from an LLM +{ok, Ref} = py:stream_start(llm_client, generate_tokens, [Prompt]), +stream_to_client(Ref, WebSocket). -process_stream(Ref) -> +stream_to_client(Ref, WS) -> receive - {py_chunk, Ref, Value} -> - io:format("Got: ~p~n", [Value]), - process_stream(Ref); - {py_end, Ref} -> - io:format("Done~n"); - {py_error, Ref, Error} -> - io:format("Error: ~p~n", [Error]) - after 30000 -> - io:format("Timeout~n") + {py_stream, Ref, {data, Token}} -> + websocket:send(WS, Token), + stream_to_client(Ref, WS); + {py_stream, Ref, done} -> + websocket:send(WS, <<"[DONE]">>); + {py_stream, Ref, {error, _}} -> + websocket:send(WS, <<"[ERROR]">>) end. ``` -## Memory Considerations - -- Values are collected into a list by `stream_eval/1,2` -- For large datasets, consider processing chunks as they arrive -- Generators are garbage collected after exhaustion - -## Use Cases - ### Data Processing Pipelines ```erlang @@ -119,22 +189,6 @@ process_stream(Ref) -> Results = [process_line(L) || L <- Lines]. ``` -### Infinite Sequences - -```erlang -%% Define infinite counter -ok = py:exec(<<" -def counter(): - n = 0 - while True: - yield n - n += 1 -">>). - -%% Take first 100 (use your own take function) -%% Can't use stream/3 directly for infinite - need custom handling -``` - ### Batch Processing ```erlang diff --git a/src/py.erl b/src/py.erl index 3e9060e..6a38a81 100644 --- a/src/py.erl +++ b/src/py.erl @@ -56,6 +56,9 @@ stream/4, stream_eval/1, stream_eval/2, + stream_start/3, + stream_start/4, + stream_cancel/1, version/0, memory_stats/0, gc/0, @@ -414,13 +417,15 @@ stream(Module, Func, Args) -> %% @doc Stream results from a Python generator with kwargs. -spec stream(py_module(), py_func(), py_args(), py_kwargs()) -> py_result(). +stream(Module, Func, Args, Kwargs) when map_size(Kwargs) == 0 -> + %% No kwargs - use stream_start and collect results + {ok, Ref} = stream_start(Module, Func, Args), + collect_stream(Ref, []); stream(Module, Func, Args, Kwargs) -> - %% Route through the new process-per-context system - %% Create the generator and collect all values using list() + %% With kwargs - use eval approach Ctx = py_context_router:get_context(), ModuleBin = ensure_binary(Module), FuncBin = ensure_binary(Func), - %% Build code that calls the function and collects all yielded values KwargsCode = format_kwargs(Kwargs), ArgsCode = format_args(Args), Code = iolist_to_binary([ @@ -429,6 +434,19 @@ stream(Module, Func, Args, Kwargs) -> ]), py_context:eval(Ctx, Code, #{}). +%% @private Collect all stream events into a list +collect_stream(Ref, Acc) -> + receive + {py_stream, Ref, {data, Value}} -> + collect_stream(Ref, [Value | Acc]); + {py_stream, Ref, done} -> + {ok, lists:reverse(Acc)}; + {py_stream, Ref, {error, Reason}} -> + {error, Reason} + after 30000 -> + {error, timeout} + end. + %% @private Format arguments for Python code format_args([]) -> <<>>; format_args(Args) -> @@ -468,6 +486,134 @@ stream_eval(Code, Locals) -> WrappedCode = <<"list(", CodeBin/binary, ")">>, py_context:eval(Ctx, WrappedCode, Locals). +%%% ============================================================================ +%%% True Streaming API (Event-driven) +%%% ============================================================================ + +%% @doc Start a true streaming iteration from a Python generator. +%% +%% Unlike stream/3,4 which collects all values at once, this function +%% returns immediately with a reference and sends values as events +%% to the calling process as they are yielded. +%% +%% Events sent to the owner process: +%% - `{py_stream, Ref, {data, Value}}' - Each yielded value +%% - `{py_stream, Ref, done}' - Stream completed +%% - `{py_stream, Ref, {error, Reason}}' - Stream error +%% +%% Supports both sync generators and async generators (coroutines). +%% +%% Example: +%% ``` +%% {ok, Ref} = py:stream_start(builtins, iter, [[1,2,3,4,5]]), +%% receive_loop(Ref). +%% +%% receive_loop(Ref) -> +%% receive +%% {py_stream, Ref, {data, Value}} -> +%% io:format("Got: ~p~n", [Value]), +%% receive_loop(Ref); +%% {py_stream, Ref, done} -> +%% io:format("Complete~n"); +%% {py_stream, Ref, {error, Reason}} -> +%% io:format("Error: ~p~n", [Reason]) +%% after 30000 -> +%% timeout +%% end. +%% ''' +-spec stream_start(py_module(), py_func(), py_args()) -> {ok, reference()}. +stream_start(Module, Func, Args) -> + stream_start(Module, Func, Args, #{}). + +%% @doc Start a true streaming iteration with options. +%% +%% Options: +%% - `owner => pid()' - Process to receive events (default: self()) +%% +%% @param Module Python module name +%% @param Func Python function name +%% @param Args Function arguments +%% @param Opts Options map +%% @returns {ok, Ref} where Ref is used to identify stream events +-spec stream_start(py_module(), py_func(), py_args(), map()) -> {ok, reference()}. +stream_start(Module, Func, Args, Opts) -> + Owner = maps:get(owner, Opts, self()), + Ref = make_ref(), + ModuleBin = ensure_binary(Module), + FuncBin = ensure_binary(Func), + RefHash = erlang:phash2(Ref), + %% Store owner and ref for Python to retrieve + %% Use binary keys because Python strings become binaries + py_state:store({<<"stream_owner">>, RefHash}, Owner), + py_state:store({<<"stream_ref">>, RefHash}, Ref), + py_state:store({<<"stream_args">>, RefHash}, Args), + %% Spawn an Erlang process to run the streaming iteration + spawn(fun() -> + stream_run_python(ModuleBin, FuncBin, RefHash) + end), + {ok, Ref}. + +%% @private Run the streaming via Python code +stream_run_python(ModuleBin, FuncBin, RefHash) -> + RefHashBin = integer_to_binary(RefHash), + %% Build Python code that streams values using callbacks + Code = iolist_to_binary([ + <<"import erlang\n">>, + <<"_rh = ">>, RefHashBin, <<"\n">>, + <<"_args = erlang.call('state_get', ('stream_args', _rh))\n">>, + <<"if _args is None:\n">>, + <<" _args = []\n">>, + <<"try:\n">>, + <<" _mod = __import__('">>, ModuleBin, <<"')\n">>, + <<" _fn = getattr(_mod, '">>, FuncBin, <<"')\n">>, + <<" _gen = _fn(*_args) if _args else _fn()\n">>, + <<" for _val in _gen:\n">>, + <<" if erlang.call('_py_stream_cancelled', _rh):\n">>, + <<" erlang.call('_py_stream_send', _rh, 'error', 'cancelled')\n">>, + <<" break\n">>, + <<" erlang.call('_py_stream_send', _rh, 'data', _val)\n">>, + <<" else:\n">>, + <<" erlang.call('_py_stream_send', _rh, 'done', None)\n">>, + <<"except Exception as _e:\n">>, + <<" erlang.call('_py_stream_send', _rh, 'error', str(_e))\n">>, + <<"finally:\n">>, + <<" erlang.call('_py_stream_cleanup', _rh)\n">> + ]), + %% Execute the streaming code + case exec(Code) of + ok -> ok; + {error, Reason} -> + %% Try to notify owner of error + case py_state:fetch({<<"stream_owner">>, RefHash}) of + {ok, Owner} -> + case py_state:fetch({<<"stream_ref">>, RefHash}) of + {ok, Ref} -> + Owner ! {py_stream, Ref, {error, Reason}}, + py_state:remove({<<"stream_owner">>, RefHash}), + py_state:remove({<<"stream_ref">>, RefHash}), + py_state:remove({<<"stream_args">>, RefHash}); + _ -> ok + end; + _ -> ok + end + end. + +%% @doc Cancel an active stream. +%% +%% Sends a cancellation signal to stop the stream iteration. +%% Any pending values may still be delivered before the stream stops. +%% +%% @param Ref The stream reference from stream_start/3,4 +%% @returns ok +-spec stream_cancel(reference()) -> ok. +stream_cancel(Ref) when is_reference(Ref) -> + %% Store cancellation flag that the streaming task checks + %% Use hash because we can't pass Erlang refs to Python callbacks easily + %% Use binary key because Python strings become binaries + RefHash = erlang:phash2(Ref), + py_state:store({<<"stream_cancelled_hash">>, RefHash}, true), + ok. + %%% ============================================================================ %%% Info %%% ============================================================================ diff --git a/src/py_channel.erl b/src/py_channel.erl index e202590..8f6836e 100644 --- a/src/py_channel.erl +++ b/src/py_channel.erl @@ -26,8 +26,8 @@ %%% %% Send messages to Python %%% ok = py_channel:send(Ch, {request, self(), <<"data">>}), %%% -%%% %% Python receives via channel.receive() -%%% %% Python sends back via erlang.channel_reply(pid, term) +%%% %% Python: ch = Channel(ref); msg = ch.receive() +%%% %% Python: reply(pid, term) %%% %%% %% Close when done %%% py_channel:close(Ch). diff --git a/src/py_state.erl b/src/py_state.erl index de74a49..3ebffed 100644 --- a/src/py_state.erl +++ b/src/py_state.erl @@ -100,6 +100,14 @@ register_callbacks() -> py_callback:register(state_clear, fun state_clear_callback/1), py_callback:register(state_incr, fun state_incr_callback/1), py_callback:register(state_decr, fun state_decr_callback/1), + %% Internal callback for stream_start to fetch stored values + py_callback:register(<<"_py_state_fetch">>, fun state_fetch_internal/1), + %% Check if stream is cancelled + py_callback:register(<<"_py_stream_cancelled">>, fun stream_cancelled_callback/1), + %% Send stream event to owner + py_callback:register(<<"_py_stream_send">>, fun stream_send_callback/1), + %% Clean up stream state + py_callback:register(<<"_py_stream_cleanup">>, fun stream_cleanup_callback/1), ok. %% @doc Fetch a value from the shared state. @@ -206,3 +214,68 @@ state_decr_callback([Key]) -> decr(Key); state_decr_callback([Key, Amount]) -> decr(Key, Amount). + +%% @private Internal fetch for stream_start to pass args/pid/ref to Python +state_fetch_internal([{Type, Key}]) -> + case fetch({Type, Key}) of + {ok, Value} -> + %% Clean up after fetching (one-time use) + remove({Type, Key}), + Value; + {error, not_found} -> + none + end; +state_fetch_internal([Type, Key]) -> + state_fetch_internal([{Type, Key}]). + +%% @private Check if a stream has been cancelled +stream_cancelled_callback([RefHash]) -> + %% Use binary key because Python strings become binaries + case fetch({<<"stream_cancelled_hash">>, RefHash}) of + {ok, true} -> + %% Clean up the cancellation flag + remove({<<"stream_cancelled_hash">>, RefHash}), + true; + {error, not_found} -> + false + end. + +%% @private Send a stream event to the owner process +%% Called from Python as erlang.call('_py_stream_send', [RefHash, EventType, Value]) +%% EventType: 'data' | 'done' | 'error' (may come as binary from Python) +stream_send_callback([RefHash, EventType, Value]) -> + %% Use binary keys because Python strings become binaries + case fetch({<<"stream_owner">>, RefHash}) of + {ok, Owner} -> + case fetch({<<"stream_ref">>, RefHash}) of + {ok, Ref} -> + Event = case normalize_event_type(EventType) of + done -> done; + data -> {data, Value}; + error -> {error, Value}; + Other -> {error, {unknown_event, Other, Value}} + end, + Owner ! {py_stream, Ref, Event}, + ok; + {error, not_found} -> + {error, ref_not_found} + end; + {error, not_found} -> + {error, owner_not_found} + end. + +%% @private Normalize event type from Python (may come as binary or atom) +normalize_event_type(done) -> done; +normalize_event_type(data) -> data; +normalize_event_type(error) -> error; +normalize_event_type(<<"done">>) -> done; +normalize_event_type(<<"data">>) -> data; +normalize_event_type(<<"error">>) -> error; +normalize_event_type(Other) -> Other. + +%% @private Clean up stream state entries +stream_cleanup_callback([RefHash]) -> + remove({<<"stream_owner">>, RefHash}), + remove({<<"stream_ref">>, RefHash}), + remove({<<"stream_args">>, RefHash}), + ok. diff --git a/test/py_stream_SUITE.erl b/test/py_stream_SUITE.erl new file mode 100644 index 0000000..282a164 --- /dev/null +++ b/test/py_stream_SUITE.erl @@ -0,0 +1,163 @@ +%%% @doc Common Test suite for py:stream_start/3,4 true streaming API. +-module(py_stream_SUITE). + +-include_lib("common_test/include/ct.hrl"). + +-export([ + all/0, + init_per_suite/1, + end_per_suite/1 +]). + +-export([ + test_stream_start_basic/1, + test_stream_start_range/1, + test_stream_start_generator/1, + test_stream_start_with_owner/1, + test_stream_cancel/1, + test_stream_error/1, + test_stream_empty/1, + test_stream_large/1 +]). + +all() -> + [ + test_stream_start_basic, + test_stream_start_range, + test_stream_start_generator, + test_stream_start_with_owner, + test_stream_cancel, + test_stream_error, + test_stream_empty, + test_stream_large + ]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(erlang_python), + Config. + +end_per_suite(_Config) -> + ok = application:stop(erlang_python), + ok. + +%% Helper to collect all stream events +collect_stream(Ref) -> + collect_stream(Ref, [], 5000). + +collect_stream(Ref, Acc, Timeout) -> + receive + {py_stream, Ref, {data, Value}} -> + collect_stream(Ref, [Value | Acc], Timeout); + {py_stream, Ref, done} -> + {ok, lists:reverse(Acc)}; + {py_stream, Ref, {error, Reason}} -> + {error, Reason} + after Timeout -> + {error, timeout} + end. + +%% Test basic streaming with iter() +test_stream_start_basic(_Config) -> + {ok, Ref} = py:stream_start(builtins, iter, [[1, 2, 3, 4, 5]]), + {ok, Values} = collect_stream(Ref), + [1, 2, 3, 4, 5] = Values, + ok. + +%% Test streaming with range() +test_stream_start_range(_Config) -> + {ok, Ref} = py:stream_start(builtins, range, [5]), + {ok, Values} = collect_stream(Ref), + [0, 1, 2, 3, 4] = Values, + ok. + +%% Test streaming with a filter - uses filter() which returns an iterator +test_stream_start_generator(_Config) -> + %% Stream only items that don't raise StopIteration + %% Use enumerate to get (index, value) pairs from a string + {ok, Ref} = py:stream_start(builtins, enumerate, [<<"hello">>]), + {ok, Values} = collect_stream(Ref), + %% enumerate returns tuples of (index, char) + 5 = length(Values), + {0, <<"h">>} = hd(Values), + ok. + +%% Test streaming with custom owner process +test_stream_start_with_owner(_Config) -> + Self = self(), + Receiver = spawn(fun() -> + receive + {collect, Ref} -> + Result = collect_stream(Ref), + Self ! {result, Result} + end + end), + {ok, Ref} = py:stream_start(builtins, iter, [[10, 20, 30]], #{owner => Receiver}), + Receiver ! {collect, Ref}, + receive + {result, {ok, Values}} -> + [10, 20, 30] = Values, + ok; + {result, Error} -> + ct:fail({unexpected_error, Error}) + after 5000 -> + ct:fail(timeout) + end. + +%% Test stream cancellation +test_stream_cancel(_Config) -> + %% Use a large range that we'll cancel partway through + {ok, Ref} = py:stream_start(builtins, range, [1000000]), + %% Receive a few values then cancel + receive + {py_stream, Ref, {data, 0}} -> ok + after 5000 -> + ct:fail(no_first_value) + end, + %% Cancel the stream + ok = py:stream_cancel(Ref), + %% Drain remaining messages (may receive a few more before cancellation takes effect) + drain_stream(Ref), + ok. + +%% Helper to drain stream messages +drain_stream(Ref) -> + receive + {py_stream, Ref, _} -> drain_stream(Ref) + after 1000 -> + ok + end. + +%% Test error handling in generator +test_stream_error(_Config) -> + %% Call a function that doesn't exist - should get an error about attribute + {ok, Ref} = py:stream_start(builtins, nonexistent_function, []), + %% Should get an error about missing attribute + receive + {py_stream, Ref, {error, Reason}} -> + true = is_binary(Reason) orelse is_list(Reason), + %% Error should mention something about attribute not found + ok; + {py_stream, Ref, done} -> + ct:fail(expected_error_not_done); + {py_stream, Ref, {data, _}} -> + ct:fail(expected_error_not_data) + after 5000 -> + ct:fail(no_error) + end. + +%% Test empty generator +test_stream_empty(_Config) -> + {ok, Ref} = py:stream_start(builtins, iter, [[]]), + {ok, Values} = collect_stream(Ref), + [] = Values, + ok. + +%% Test streaming a larger sequence +test_stream_large(_Config) -> + {ok, Ref} = py:stream_start(builtins, range, [1000]), + {ok, Values} = collect_stream(Ref, [], 30000), + 1000 = length(Values), + %% Verify first and last values + 0 = hd(Values), + 999 = lists:last(Values), + ok.