Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request/Response streaming for Finch adapter, SSE middleware #540

Merged
merged 2 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 45 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Tesla is an HTTP client loosely based on [Faraday](https://github.com/lostisland
It embraces the concept of middleware when processing the request/response cycle.

> Note that this README refers to the `master` branch of Tesla, not the latest
released version on Hex. See [the documentation](https://hexdocs.pm/tesla) for
the documentation of the version you're using.
> released version on Hex. See [the documentation](https://hexdocs.pm/tesla) for
> the documentation of the version you're using.

For the list of changes, checkout the latest [release notes](https://github.com/teamon/tesla/releases).

Expand Down Expand Up @@ -83,8 +83,8 @@ config :tesla, adapter: Tesla.Adapter.Hackney
```

> The default adapter is erlang's built-in `httpc`, but it is not recommended
to use it in production environment as it does not validate SSL certificates
[among other issues](https://github.com/teamon/tesla/issues?utf8=%E2%9C%93&q=is%3Aissue+label%3Ahttpc+).
> to use it in production environment as it does not validate SSL certificates
> [among other issues](https://github.com/teamon/tesla/issues?utf8=%E2%9C%93&q=is%3Aissue+label%3Ahttpc+).

## Documentation

Expand Down Expand Up @@ -243,7 +243,11 @@ Tesla.get(client, "/", opts: [adapter: [recv_timeout: 30_000]])

## Streaming

If adapter supports it, you can pass a [Stream](https://hexdocs.pm/elixir/main/Stream.html) as body, e.g.:
### Streaming Request Body

If adapter supports it, you can pass a
[Stream](https://hexdocs.pm/elixir/main/Stream.html) as request
body, e.g.:

```elixir
defmodule ElasticSearch do
Expand All @@ -259,7 +263,41 @@ defmodule ElasticSearch do
end
```

Each piece of stream will be encoded as JSON and sent as a new line (conforming to JSON stream format).
Each piece of stream will be encoded as JSON and sent as a new line (conforming
to JSON stream format).

### Streaming Response Body

If adapter supports it, you can pass a `response: :stream` option to return
response body as a
[Stream](https://elixir-lang.org/docs/stable/elixir/Stream.html)

```elixir
defmodule OpenAI do
def new(token) do
middleware = [
{Tesla.Middleware.BaseUrl, "https://api.openai.com/v1"},
{Tesla.Middleware.BearerAuth, token: token},
{Tesla.Middleware.JSON, decode_content_types: ["text/event-stream"]},
{Tesla.Middleware.SSE, only: :data}
]
Tesla.client(middleware, {Tesla.Adapter.Finch, name: MyFinch})
end

def completion(client, prompt) do
data = %{
model: "gpt-3.5-turbo",
messages: [%{role: "user", content: prompt}],
stream: true
}
Tesla.post(client, "/chat/completions", data, opts: [adapter: [response: :stream]])
end
end
client = OpenAI.new("<token>")
{:ok, env} = OpenAI.completion(client, "What is the meaning of life?")
env.body
|> Stream.each(fn chunk -> IO.inspect(chunk) end)
```

## Multipart

Expand Down Expand Up @@ -476,6 +514,7 @@ use Tesla, except: [:delete, :options]
```elixir
use Tesla, docs: false
```

### Encode only JSON request (do not decode response)

```elixir
Expand Down
82 changes: 72 additions & 10 deletions lib/tesla/adapter/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,37 +52,99 @@ if Code.ensure_loaded?(Finch) do
@behaviour Tesla.Adapter
alias Tesla.Multipart

@defaults [
receive_timeout: 15_000
]

@impl Tesla.Adapter
def call(%Tesla.Env{} = env, opts) do
opts = Tesla.Adapter.opts(env, opts)
opts = Tesla.Adapter.opts(@defaults, env, opts)

name = Keyword.fetch!(opts, :name)
url = Tesla.build_url(env.url, env.query)
req_opts = Keyword.take(opts, [:pool_timeout, :receive_timeout])
req = build(env.method, url, env.headers, env.body)

case request(name, env.method, url, env.headers, env.body, req_opts) do
case request(req, name, req_opts, opts) do
{:ok, %Finch.Response{status: status, headers: headers, body: body}} ->
{:ok, %Tesla.Env{env | status: status, headers: headers, body: body}}

{:error, mint_error} ->
{:error, Exception.message(mint_error)}
{:error, %Mint.TransportError{reason: reason}} ->
{:error, reason}

{:error, reason} ->
{:error, reason}
end
end

defp request(name, method, url, headers, %Multipart{} = mp, opts) do
defp build(method, url, headers, %Multipart{} = mp) do
headers = headers ++ Multipart.headers(mp)
body = Multipart.body(mp) |> Enum.to_list()

request(name, method, url, headers, body, opts)
build(method, url, headers, body)
end

defp request(_name, _method, _url, _headers, %Stream{}, _opts) do
raise "Streaming is not supported by this adapter!"
defp build(method, url, headers, %Stream{} = body_stream) do
build(method, url, headers, {:stream, body_stream})
end

defp request(name, method, url, headers, body, opts) do
defp build(method, url, headers, body_stream_fun) when is_function(body_stream_fun) do
build(method, url, headers, {:stream, body_stream_fun})
end

defp build(method, url, headers, body) do
Finch.build(method, url, headers, body)
|> Finch.request(name, opts)
end

defp request(req, name, req_opts, opts) do
case opts[:response] do
:stream -> stream(req, name, req_opts)
nil -> Finch.request(req, name, req_opts)
other -> raise "Unknown response option: #{inspect(other)}"
end
end

defp stream(req, name, opts) do
owner = self()
ref = make_ref()

fun = fn
{:status, status}, _acc -> status
{:headers, headers}, status -> send(owner, {ref, {:status, status, headers}})
{:data, data}, _acc -> send(owner, {ref, {:data, data}})
end

task =
Task.async(fn ->
case Finch.stream(req, name, nil, fun, opts) do
{:ok, _acc} -> send(owner, {ref, :eof})
{:error, error} -> send(owner, {ref, {:error, error}})
end
end)
teamon marked this conversation as resolved.
Show resolved Hide resolved

receive do
{^ref, {:status, status, headers}} ->
body =
Stream.unfold(nil, fn _ ->
receive do
{^ref, {:data, data}} ->
{data, nil}

{^ref, :eof} ->
Task.await(task)
nil
after
opts[:receive_timeout] ->
Task.shutdown(task, :brutal_kill)
nil
end
end)

{:ok, %Finch.Response{status: status, headers: headers, body: body}}
after
opts[:receive_timeout] ->
{:error, :timeout}
end
end
end
end
17 changes: 16 additions & 1 deletion lib/tesla/middleware/json.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,18 @@ defmodule Tesla.Middleware.JSON do
end
end

defp decode_body(body, opts) when is_struct(body, Stream) or is_function(body),
do: {:ok, decode_stream(body, opts)}

defp decode_body(body, opts), do: process(body, :decode, opts)

defp decodable?(env, opts), do: decodable_body?(env) && decodable_content_type?(env, opts)

defp decodable_body?(env) do
(is_binary(env.body) && env.body != "") || (is_list(env.body) && env.body != [])
(is_binary(env.body) && env.body != "") ||
(is_list(env.body) && env.body != []) ||
is_function(env.body) ||
is_struct(env.body, Stream)
end

defp decodable_content_type?(env, opts) do
Expand All @@ -128,6 +134,15 @@ defmodule Tesla.Middleware.JSON do
end
end

defp decode_stream(body, opts) do
Stream.map(body, fn chunk ->
case decode_body(chunk, opts) do
{:ok, item} -> item
_ -> chunk
end
end)
end

defp content_types(opts),
do: @default_content_types ++ Keyword.get(opts, :decode_content_types, [])

Expand Down
102 changes: 102 additions & 0 deletions lib/tesla/middleware/sse.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
defmodule Tesla.Middleware.SSE do
@moduledoc """
Decode Server Sent Events.

This middleware is mostly useful when streaming response body.

## Examples

```
plug Tesla.Middleware.SSE, only: :data

```

## Options

- `:only` - keep only specified keys in event (necessary for using with `JSON` middleware)
- `:decode_content_types` - list of additional decodable content-types
"""

@behaviour Tesla.Middleware

@default_content_types ["text/event-stream"]

@impl Tesla.Middleware
def call(env, next, opts) do
opts = opts || []

with {:ok, env} <- Tesla.run(env, next) do
decode(env, opts)
end
end

def decode(env, opts) do
if decodable_content_type?(env, opts) do
{:ok, %{env | body: decode_body(env.body, opts)}}
else
{:ok, env}
end
end

defp decode_body(body, opts) when is_struct(body, Stream) or is_function(body) do
body
|> Stream.chunk_while(
"",
fn elem, acc ->
{lines, [rest]} = (acc <> elem) |> String.split("\n\n") |> Enum.split(-1)
{:cont, lines, rest}
end,
fn
"" -> {:cont, ""}
acc -> {:cont, acc, ""}
end
)
|> Stream.flat_map(& &1)
|> Stream.map(&decode_message/1)
|> Stream.flat_map(&only(&1, opts[:only]))
end

defp decode_body(binary, opts) when is_binary(binary) do
binary
|> String.split("\n\n")
|> Enum.map(&decode_message/1)
|> Enum.flat_map(&only(&1, opts[:only]))
end

defp decode_message(message) do
message
|> String.split("\n")
|> Enum.map(&decode_body/1)
|> Enum.reduce(%{}, fn
:empty, acc -> acc
{:data, data}, acc -> Map.update(acc, :data, data, &(&1 <> "\n" <> data))
{key, value}, acc -> Map.put_new(acc, key, value)
end)
end

defp decode_body(": " <> comment), do: {:comment, comment}
defp decode_body("data: " <> data), do: {:data, data}
defp decode_body("event: " <> event), do: {:event, event}
defp decode_body("id: " <> id), do: {:id, id}
defp decode_body("retry: " <> retry), do: {:retry, retry}
defp decode_body(""), do: :empty

defp decodable_content_type?(env, opts) do
case Tesla.get_header(env, "content-type") do
nil -> false
content_type -> Enum.any?(content_types(opts), &String.starts_with?(content_type, &1))
end
end

defp content_types(opts),
do: @default_content_types ++ Keyword.get(opts, :decode_content_types, [])

defp only(message, nil), do: [message]

defp only(message, key) do
case Map.get(message, key) do
nil -> []
val -> [val]
end
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ defmodule Tesla.Mixfile do
Tesla.Middleware.PathParams,
Tesla.Middleware.Query,
Tesla.Middleware.Retry,
Tesla.Middleware.SSE,
Tesla.Middleware.Telemetry,
Tesla.Middleware.Timeout
]
Expand Down
2 changes: 1 addition & 1 deletion test/support/adapter_case/stream_request_body.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Tesla.AdapterCase.StreamRequestBody do
quote do
alias Tesla.Env

describe "Stream" do
describe "Stream Request" do
test "stream request body: Stream.map" do
request = %Env{
method: :post,
Expand Down
23 changes: 23 additions & 0 deletions test/support/adapter_case/stream_response_body.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Tesla.AdapterCase.StreamResponseBody do
defmacro __using__(_) do
quote do
alias Tesla.Env

describe "Stream Response" do
test "stream response body" do
request = %Env{
method: :get,
url: "#{@http}/stream/20"
}

assert {:ok, %Env{} = response} = call(request, response: :stream)
assert response.status == 200
assert is_function(response.body) || response.body.__struct__ == Stream

body = Enum.to_list(response.body)
assert Enum.count(body) == 20
end
end
end
end
end