Skip to content

Commit

Permalink
Fixed a performance issue on ElasticsearchEx.Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
GRoguelon committed Apr 4, 2024
1 parent aa5177a commit dd49ba3
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

* **Bug fixes:**
* Fixed a bug on `ElasticsearchEx.Api.Search.create_pid/2` with empty body
* Fixed a bug on `ElasticsearchEx.Stream.stream/3` where an extra HTTP call was made at the end of the `Stream`

## v1.3.0 (2024-04-04)

Expand Down
46 changes: 30 additions & 16 deletions lib/elasticsearch_ex/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule ElasticsearchEx.Stream do

@typep search_after :: []

@typep acc :: {pit(), nil | search_after()}
@typep acc :: {pit(), nil | :end_of_stream | search_after()}

## Module attributes

Expand Down Expand Up @@ -61,32 +61,46 @@ defmodule ElasticsearchEx.Stream do
end
end

@spec next_fun(query(), keyword()) :: (acc() -> {[] | :halt, acc()})
@spec next_fun(query(), keyword()) :: (acc() -> {:halt, pit()} | {nonempty_list(), acc()})
defp next_fun(query, params) do
fn {pit, search_after} ->
query = query |> generate_pit_query(pit) |> generate_search_after_query(search_after)
per_page = Map.fetch!(query, :size)

Logger.debug(
"Searching through the PIT: #{pit.id} and search_after: #{inspect(search_after)}"
)
&do_next_fun(&1, query, params, per_page)
end

@spec do_next_fun(acc(), map(), keyword(), pos_integer()) ::
{:halt, pit()} | {nonempty_list(), acc()}
defp do_next_fun({pit, :end_of_stream}, _query, _params, _per_page) do
{:halt, pit}
end

defp do_next_fun({pit, search_after}, query, params, per_page) do
query = query |> generate_pit_query(pit) |> generate_search_after_query(search_after)

Logger.debug(
"Searching through the PIT: #{pit.id} and search_after: #{inspect(search_after)}"
)

case SearchApi.search(query, nil, params) do
{:ok, %{"hits" => %{"hits" => []}}} ->
{:halt, {pit, search_after}}
case SearchApi.search(query, nil, params) do
{:ok, %{"hits" => %{"hits" => []}}} ->
{:halt, pit}

{:ok, %{"hits" => %{"hits" => hits}}} ->
{:ok, %{"hits" => %{"hits" => hits}}} ->
if length(hits) < per_page do
{hits, {pit, :end_of_stream}}
else
search_after = hits |> List.last() |> Map.fetch!("sort")

{hits, {pit, search_after}}
end

any ->
raise "unknown result: #{inspect(any)}"
end
any ->
raise "unknown result: #{inspect(any)}"
end
end

@spec after_fun(acc()) :: any()
defp after_fun({%{id: pit_id}, _search_after}) do
@spec after_fun(pit()) :: any()
defp after_fun(%{id: pit_id}) do
case SearchApi.close_pit(pit_id) do
{:ok, %{"num_freed" => _, "succeeded" => true}} ->
Logger.debug("Deleted the PIT: #{pit_id}")
Expand Down

0 comments on commit dd49ba3

Please sign in to comment.