From d488bb272939cdafc922fb7f2f8c9d2a22682e38 Mon Sep 17 00:00:00 2001 From: Tymon Tobolski Date: Thu, 11 Apr 2024 15:01:24 +0200 Subject: [PATCH] Request/Response streaming for Finch adapter, SSE middleware (#540) * Request/Response streaming for Finch adapter * [Finch] Change error handling, fix for response stream (#573) --------- Co-authored-by: Adam Hodowany --- README.md | 51 ++++++- lib/tesla/adapter/finch.ex | 82 ++++++++++-- lib/tesla/middleware/json.ex | 17 ++- lib/tesla/middleware/sse.ex | 102 ++++++++++++++ mix.exs | 1 + .../adapter_case/stream_request_body.ex | 2 +- .../adapter_case/stream_response_body.ex | 23 ++++ test/tesla/adapter/finch_test.exs | 21 ++- test/tesla/middleware/json_test.exs | 26 ++++ test/tesla/middleware/sse_test.exs | 124 ++++++++++++++++++ 10 files changed, 430 insertions(+), 19 deletions(-) create mode 100644 lib/tesla/middleware/sse.ex create mode 100644 test/support/adapter_case/stream_response_body.ex create mode 100644 test/tesla/middleware/sse_test.exs diff --git a/README.md b/README.md index e1ad859a..f14aeedb 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,8 @@ Tesla is an HTTP client loosely based on [Faraday](https://github.com/lostisland It embraces the concept of middleware when processing the request/response cycle. > Note that this README refers to the `master` branch of Tesla, not the latest - released version on Hex. See [the documentation](https://hexdocs.pm/tesla) for - the documentation of the version you're using. +> released version on Hex. See [the documentation](https://hexdocs.pm/tesla) for +> the documentation of the version you're using. For the list of changes, checkout the latest [release notes](https://github.com/teamon/tesla/releases). @@ -83,8 +83,8 @@ config :tesla, adapter: Tesla.Adapter.Hackney ``` > The default adapter is erlang's built-in `httpc`, but it is not recommended -to use it in production environment as it does not validate SSL certificates -[among other issues](https://github.com/teamon/tesla/issues?utf8=%E2%9C%93&q=is%3Aissue+label%3Ahttpc+). +> to use it in production environment as it does not validate SSL certificates +> [among other issues](https://github.com/teamon/tesla/issues?utf8=%E2%9C%93&q=is%3Aissue+label%3Ahttpc+). ## Documentation @@ -243,7 +243,11 @@ Tesla.get(client, "/", opts: [adapter: [recv_timeout: 30_000]]) ## Streaming -If adapter supports it, you can pass a [Stream](https://hexdocs.pm/elixir/main/Stream.html) as body, e.g.: +### Streaming Request Body + +If adapter supports it, you can pass a +[Stream](https://hexdocs.pm/elixir/main/Stream.html) as request +body, e.g.: ```elixir defmodule ElasticSearch do @@ -259,7 +263,41 @@ defmodule ElasticSearch do end ``` -Each piece of stream will be encoded as JSON and sent as a new line (conforming to JSON stream format). +Each piece of stream will be encoded as JSON and sent as a new line (conforming +to JSON stream format). + +### Streaming Response Body + +If adapter supports it, you can pass a `response: :stream` option to return +response body as a +[Stream](https://elixir-lang.org/docs/stable/elixir/Stream.html) + +```elixir +defmodule OpenAI do + def new(token) do + middleware = [ + {Tesla.Middleware.BaseUrl, "https://api.openai.com/v1"}, + {Tesla.Middleware.BearerAuth, token: token}, + {Tesla.Middleware.JSON, decode_content_types: ["text/event-stream"]}, + {Tesla.Middleware.SSE, only: :data} + ] + Tesla.client(middleware, {Tesla.Adapter.Finch, name: MyFinch}) + end + + def completion(client, prompt) do + data = %{ + model: "gpt-3.5-turbo", + messages: [%{role: "user", content: prompt}], + stream: true + } + Tesla.post(client, "/chat/completions", data, opts: [adapter: [response: :stream]]) + end +end +client = OpenAI.new("") +{:ok, env} = OpenAI.completion(client, "What is the meaning of life?") +env.body +|> Stream.each(fn chunk -> IO.inspect(chunk) end) +``` ## Multipart @@ -476,6 +514,7 @@ use Tesla, except: [:delete, :options] ```elixir use Tesla, docs: false ``` + ### Encode only JSON request (do not decode response) ```elixir diff --git a/lib/tesla/adapter/finch.ex b/lib/tesla/adapter/finch.ex index bed1afa1..0bf65b6b 100644 --- a/lib/tesla/adapter/finch.ex +++ b/lib/tesla/adapter/finch.ex @@ -52,37 +52,99 @@ if Code.ensure_loaded?(Finch) do @behaviour Tesla.Adapter alias Tesla.Multipart + @defaults [ + receive_timeout: 15_000 + ] + @impl Tesla.Adapter def call(%Tesla.Env{} = env, opts) do - opts = Tesla.Adapter.opts(env, opts) + opts = Tesla.Adapter.opts(@defaults, env, opts) name = Keyword.fetch!(opts, :name) url = Tesla.build_url(env.url, env.query) req_opts = Keyword.take(opts, [:pool_timeout, :receive_timeout]) + req = build(env.method, url, env.headers, env.body) - case request(name, env.method, url, env.headers, env.body, req_opts) do + case request(req, name, req_opts, opts) do {:ok, %Finch.Response{status: status, headers: headers, body: body}} -> {:ok, %Tesla.Env{env | status: status, headers: headers, body: body}} - {:error, mint_error} -> - {:error, Exception.message(mint_error)} + {:error, %Mint.TransportError{reason: reason}} -> + {:error, reason} + + {:error, reason} -> + {:error, reason} end end - defp request(name, method, url, headers, %Multipart{} = mp, opts) do + defp build(method, url, headers, %Multipart{} = mp) do headers = headers ++ Multipart.headers(mp) body = Multipart.body(mp) |> Enum.to_list() - request(name, method, url, headers, body, opts) + build(method, url, headers, body) end - defp request(_name, _method, _url, _headers, %Stream{}, _opts) do - raise "Streaming is not supported by this adapter!" + defp build(method, url, headers, %Stream{} = body_stream) do + build(method, url, headers, {:stream, body_stream}) end - defp request(name, method, url, headers, body, opts) do + defp build(method, url, headers, body_stream_fun) when is_function(body_stream_fun) do + build(method, url, headers, {:stream, body_stream_fun}) + end + + defp build(method, url, headers, body) do Finch.build(method, url, headers, body) - |> Finch.request(name, opts) + end + + defp request(req, name, req_opts, opts) do + case opts[:response] do + :stream -> stream(req, name, req_opts) + nil -> Finch.request(req, name, req_opts) + other -> raise "Unknown response option: #{inspect(other)}" + end + end + + defp stream(req, name, opts) do + owner = self() + ref = make_ref() + + fun = fn + {:status, status}, _acc -> status + {:headers, headers}, status -> send(owner, {ref, {:status, status, headers}}) + {:data, data}, _acc -> send(owner, {ref, {:data, data}}) + end + + task = + Task.async(fn -> + case Finch.stream(req, name, nil, fun, opts) do + {:ok, _acc} -> send(owner, {ref, :eof}) + {:error, error} -> send(owner, {ref, {:error, error}}) + end + end) + + receive do + {^ref, {:status, status, headers}} -> + body = + Stream.unfold(nil, fn _ -> + receive do + {^ref, {:data, data}} -> + {data, nil} + + {^ref, :eof} -> + Task.await(task) + nil + after + opts[:receive_timeout] -> + Task.shutdown(task, :brutal_kill) + nil + end + end) + + {:ok, %Finch.Response{status: status, headers: headers, body: body}} + after + opts[:receive_timeout] -> + {:error, :timeout} + end end end end diff --git a/lib/tesla/middleware/json.ex b/lib/tesla/middleware/json.ex index b0e481f8..c720b2d8 100644 --- a/lib/tesla/middleware/json.ex +++ b/lib/tesla/middleware/json.ex @@ -113,12 +113,18 @@ defmodule Tesla.Middleware.JSON do end end + defp decode_body(body, opts) when is_struct(body, Stream) or is_function(body), + do: {:ok, decode_stream(body, opts)} + defp decode_body(body, opts), do: process(body, :decode, opts) defp decodable?(env, opts), do: decodable_body?(env) && decodable_content_type?(env, opts) defp decodable_body?(env) do - (is_binary(env.body) && env.body != "") || (is_list(env.body) && env.body != []) + (is_binary(env.body) && env.body != "") || + (is_list(env.body) && env.body != []) || + is_function(env.body) || + is_struct(env.body, Stream) end defp decodable_content_type?(env, opts) do @@ -128,6 +134,15 @@ defmodule Tesla.Middleware.JSON do end end + defp decode_stream(body, opts) do + Stream.map(body, fn chunk -> + case decode_body(chunk, opts) do + {:ok, item} -> item + _ -> chunk + end + end) + end + defp content_types(opts), do: @default_content_types ++ Keyword.get(opts, :decode_content_types, []) diff --git a/lib/tesla/middleware/sse.ex b/lib/tesla/middleware/sse.ex new file mode 100644 index 00000000..ada9bfac --- /dev/null +++ b/lib/tesla/middleware/sse.ex @@ -0,0 +1,102 @@ +defmodule Tesla.Middleware.SSE do + @moduledoc """ + Decode Server Sent Events. + + This middleware is mostly useful when streaming response body. + + ## Examples + + ``` + plug Tesla.Middleware.SSE, only: :data + + ``` + + ## Options + + - `:only` - keep only specified keys in event (necessary for using with `JSON` middleware) + - `:decode_content_types` - list of additional decodable content-types + """ + + @behaviour Tesla.Middleware + + @default_content_types ["text/event-stream"] + + @impl Tesla.Middleware + def call(env, next, opts) do + opts = opts || [] + + with {:ok, env} <- Tesla.run(env, next) do + decode(env, opts) + end + end + + def decode(env, opts) do + if decodable_content_type?(env, opts) do + {:ok, %{env | body: decode_body(env.body, opts)}} + else + {:ok, env} + end + end + + defp decode_body(body, opts) when is_struct(body, Stream) or is_function(body) do + body + |> Stream.chunk_while( + "", + fn elem, acc -> + {lines, [rest]} = (acc <> elem) |> String.split("\n\n") |> Enum.split(-1) + {:cont, lines, rest} + end, + fn + "" -> {:cont, ""} + acc -> {:cont, acc, ""} + end + ) + |> Stream.flat_map(& &1) + |> Stream.map(&decode_message/1) + |> Stream.flat_map(&only(&1, opts[:only])) + end + + defp decode_body(binary, opts) when is_binary(binary) do + binary + |> String.split("\n\n") + |> Enum.map(&decode_message/1) + |> Enum.flat_map(&only(&1, opts[:only])) + end + + defp decode_message(message) do + message + |> String.split("\n") + |> Enum.map(&decode_body/1) + |> Enum.reduce(%{}, fn + :empty, acc -> acc + {:data, data}, acc -> Map.update(acc, :data, data, &(&1 <> "\n" <> data)) + {key, value}, acc -> Map.put_new(acc, key, value) + end) + end + + defp decode_body(": " <> comment), do: {:comment, comment} + defp decode_body("data: " <> data), do: {:data, data} + defp decode_body("event: " <> event), do: {:event, event} + defp decode_body("id: " <> id), do: {:id, id} + defp decode_body("retry: " <> retry), do: {:retry, retry} + defp decode_body(""), do: :empty + + defp decodable_content_type?(env, opts) do + case Tesla.get_header(env, "content-type") do + nil -> false + content_type -> Enum.any?(content_types(opts), &String.starts_with?(content_type, &1)) + end + end + + defp content_types(opts), + do: @default_content_types ++ Keyword.get(opts, :decode_content_types, []) + + defp only(message, nil), do: [message] + + defp only(message, key) do + case Map.get(message, key) do + nil -> [] + val -> [val] + end + end +end diff --git a/mix.exs b/mix.exs index 9b56defb..7c0ff709 100644 --- a/mix.exs +++ b/mix.exs @@ -135,6 +135,7 @@ defmodule Tesla.Mixfile do Tesla.Middleware.PathParams, Tesla.Middleware.Query, Tesla.Middleware.Retry, + Tesla.Middleware.SSE, Tesla.Middleware.Telemetry, Tesla.Middleware.Timeout ] diff --git a/test/support/adapter_case/stream_request_body.ex b/test/support/adapter_case/stream_request_body.ex index 69cb1c28..bb1be7ff 100644 --- a/test/support/adapter_case/stream_request_body.ex +++ b/test/support/adapter_case/stream_request_body.ex @@ -3,7 +3,7 @@ defmodule Tesla.AdapterCase.StreamRequestBody do quote do alias Tesla.Env - describe "Stream" do + describe "Stream Request" do test "stream request body: Stream.map" do request = %Env{ method: :post, diff --git a/test/support/adapter_case/stream_response_body.ex b/test/support/adapter_case/stream_response_body.ex new file mode 100644 index 00000000..c8cd113f --- /dev/null +++ b/test/support/adapter_case/stream_response_body.ex @@ -0,0 +1,23 @@ +defmodule Tesla.AdapterCase.StreamResponseBody do + defmacro __using__(_) do + quote do + alias Tesla.Env + + describe "Stream Response" do + test "stream response body" do + request = %Env{ + method: :get, + url: "#{@http}/stream/20" + } + + assert {:ok, %Env{} = response} = call(request, response: :stream) + assert response.status == 200 + assert is_function(response.body) || response.body.__struct__ == Stream + + body = Enum.to_list(response.body) + assert Enum.count(body) == 20 + end + end + end + end +end diff --git a/test/tesla/adapter/finch_test.exs b/test/tesla/adapter/finch_test.exs index afe5a076..0eeb30f3 100644 --- a/test/tesla/adapter/finch_test.exs +++ b/test/tesla/adapter/finch_test.exs @@ -6,7 +6,8 @@ defmodule Tesla.Adapter.FinchTest do use Tesla.AdapterCase, adapter: {Tesla.Adapter.Finch, [name: @finch_name]} use Tesla.AdapterCase.Basic use Tesla.AdapterCase.Multipart - # use Tesla.AdapterCase.StreamRequestBody + use Tesla.AdapterCase.StreamRequestBody + use Tesla.AdapterCase.StreamResponseBody use Tesla.AdapterCase.SSL setup do @@ -24,4 +25,22 @@ defmodule Tesla.Adapter.FinchTest do start_supervised!({Finch, opts}) :ok end + + test "Delay request" do + request = %Env{ + method: :head, + url: "#{@http}/delay/1" + } + + assert {:error, :timeout} = call(request, receive_timeout: 100) + end + + test "Delay request with stream" do + request = %Env{ + method: :head, + url: "#{@http}/delay/1" + } + + assert {:error, :timeout} = call(request, receive_timeout: 100, response: :stream) + end end diff --git a/test/tesla/middleware/json_test.exs b/test/tesla/middleware/json_test.exs index c7cec6ec..e23b3dd1 100644 --- a/test/tesla/middleware/json_test.exs +++ b/test/tesla/middleware/json_test.exs @@ -167,6 +167,32 @@ defmodule Tesla.Middleware.JsonTest do end end + describe "Streams" do + test "encode stream" do + adapter = fn env -> + assert IO.iodata_to_binary(Enum.to_list(env.body)) == ~s|{"id":1}\n{"id":2}\n{"id":3}\n| + end + + stream = Stream.map(1..3, fn i -> %{id: i} end) + Tesla.Middleware.JSON.call(%Tesla.Env{body: stream}, [{:fn, adapter}], []) + end + + test "decode stream" do + adapter = fn _env -> + stream = Stream.map(1..3, fn i -> ~s|{"id": #{i}}\n| end) + + {:ok, + %Tesla.Env{ + headers: [{"content-type", "application/json"}], + body: stream + }} + end + + assert {:ok, env} = Tesla.Middleware.JSON.call(%Tesla.Env{}, [{:fn, adapter}], []) + assert Enum.to_list(env.body) == [%{"id" => 1}, %{"id" => 2}, %{"id" => 3}] + end + end + describe "Multipart" do defmodule MultipartClient do use Tesla diff --git a/test/tesla/middleware/sse_test.exs b/test/tesla/middleware/sse_test.exs new file mode 100644 index 00000000..9f0de25a --- /dev/null +++ b/test/tesla/middleware/sse_test.exs @@ -0,0 +1,124 @@ +defmodule Tesla.Middleware.SSETest do + use ExUnit.Case + + @env %Tesla.Env{ + status: 200, + headers: [{"content-type", "text/event-stream"}] + } + + describe "Basics" do + test "ignore not matching content-type" do + adapter = fn _env -> + {:ok, %Tesla.Env{headers: [{"content-type", "text/plain"}], body: "test"}} + end + + assert {:ok, env} = Tesla.Middleware.SSE.call(%Tesla.Env{}, [{:fn, adapter}], []) + assert env.body == "test" + end + + test "decode comment" do + adapter = fn _env -> + {:ok, %{@env | body: ": comment"}} + end + + assert {:ok, env} = Tesla.Middleware.SSE.call(%Tesla.Env{}, [{:fn, adapter}], []) + assert env.body == [%{comment: "comment"}] + end + + test "decode multiple messages" do + body = """ + : this is a test stream + + data: some text + + data: another message + data: with two lines + """ + + adapter = fn _env -> + {:ok, %{@env | body: body}} + end + + assert {:ok, env} = Tesla.Middleware.SSE.call(%Tesla.Env{}, [{:fn, adapter}], []) + + assert env.body == [ + %{comment: "this is a test stream"}, + %{data: "some text"}, + %{data: "another message\nwith two lines"} + ] + end + + test "decode named events" do + body = """ + event: userconnect + data: {"username": "bobby", "time": "02:33:48"} + + data: Here's a system message of some kind that will get used + data: to accomplish some task. + + event: usermessage + data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."} + """ + + adapter = fn _env -> + {:ok, %{@env | body: body}} + end + + assert {:ok, env} = Tesla.Middleware.SSE.call(%Tesla.Env{}, [{:fn, adapter}], []) + + assert env.body == [ + %{event: "userconnect", data: ~s|{"username": "bobby", "time": "02:33:48"}|}, + %{ + data: + "Here's a system message of some kind that will get used\nto accomplish some task." + }, + %{ + event: "usermessage", + data: ~s|{"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}| + } + ] + end + + test "output only data" do + body = """ + : comment1 + + event: userconnect + data: data1 + + data: data2 + data: data3 + + event: usermessage + data: data4 + """ + + adapter = fn _env -> + {:ok, %{@env | body: body}} + end + + assert {:ok, env} = Tesla.Middleware.SSE.call(%Tesla.Env{}, [{:fn, adapter}], only: :data) + + assert env.body == ["data1", "data2\ndata3", "data4"] + end + + test "handle stream data" do + adapter = fn _env -> + chunks = [ + ~s|dat|, + ~s|a: dat|, + ~s|a1\n\ndata: data2\n\ndata: d|, + ~s|ata3\n\n| + ] + + stream = Stream.map(chunks, & &1) + + {:ok, %{@env | body: stream}} + end + + assert {:ok, env} = Tesla.Middleware.SSE.call(%Tesla.Env{}, [{:fn, adapter}], []) + + assert Enum.to_list(env.body) == [%{data: "data1"}, %{data: "data2"}, %{data: "data3"}] + end + end +end