Skip to content

Commit

Permalink
End the Spear.stream! stream early when reading a chunk smaller than …
Browse files Browse the repository at this point in the history
…requested
  • Loading branch information
the-mikedavis committed Jan 27, 2024
1 parent 6b3a040 commit af7f7b6
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions lib/spear/reading/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ defmodule Spear.Reading.Stream do
:resolve_links?,
:timeout,
:buffer,
:credentials
:credentials,
# The number of messages read from a single chunk.
read_count: 0
]

@type t :: %__MODULE__{}
Expand Down Expand Up @@ -90,17 +92,24 @@ defmodule Spear.Reading.Stream do
)
end

# in this case the buffer has run dry and we need to request more events
# (a new buffer) with a new ReadReq
@spec unfold_continuous(t()) :: {emitted_element :: tuple(), t()} | nil
defp unfold_continuous(%__MODULE__{buffer: <<>>, read_count: read_count, max_count: max_count})
when read_count < max_count do
# If we are done reading this chunk and the amount that we read is smaller than
# what we requested, the stream must be finished.
nil
end

defp unfold_continuous(%__MODULE__{buffer: <<>>, from: from} = state) do
# in this case the buffer has run dry and we need to request more events
# (a new buffer) with a new ReadReq
response = request!(%__MODULE__{state | max_count: state.max_count + 1})

# discard the first message since it is `from`
with {^from, <<_head, _::binary>> = rest} <- unfold_chunk(response),
# look ahead in `rest` to ensure it's an event read response
{event(), _} <- unfold_chunk(rest) do
unfold_continuous(%__MODULE__{state | buffer: rest})
unfold_continuous(%__MODULE__{state | buffer: rest, read_count: 0})
else
# discard trailing stream position message
_ ->
Expand All @@ -111,7 +120,13 @@ defmodule Spear.Reading.Stream do
defp unfold_continuous(%__MODULE__{buffer: buffer} = state) do
case unfold_chunk(buffer) do
{event() = message, remaining_buffer} ->
{message, %__MODULE__{state | buffer: remaining_buffer, from: message}}
{message,
%__MODULE__{
state
| buffer: remaining_buffer,
from: message,
read_count: state.read_count + 1
}}

# skip non-event read responses
# coveralls-ignore-start
Expand Down

0 comments on commit af7f7b6

Please sign in to comment.