Skip to content

Commit

Permalink
Added a module to stream Elasticsearch documents (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
GRoguelon committed Apr 3, 2024
1 parent 643ba44 commit 7165191
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 3 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Changelog


## v1.3.0 (2024-04-04)

* **New features:**
* Added `ElasticsearchEx.Stream` module to stream documents

## v1.2.0 (2024-04-04)

* **New features:**
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
```elixir
def deps do
[
{:elasticsearch_ex, "~> 1.2"}
{:elasticsearch_ex, "~> 1.3"}
]
end
```
Expand Down
112 changes: 112 additions & 0 deletions lib/elasticsearch_ex/stream.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
defmodule ElasticsearchEx.Stream do
@moduledoc """
Provides an utility to generate an Elixir `Stream` from an Elasticsearch search.
"""

require Logger

alias ElasticsearchEx.Api.Search, as: SearchApi

## Types

@type query :: ElasticsearchEx.query()

@type index :: ElasticsearchEx.index()

@type pit :: %{required(:id) => binary(), required(:keep_alive) => binary()}

@type search_after :: []

@type acc :: {pit(), nil | search_after()}

## Module attributes

@sort_shard_doc [%{_shard_doc: :asc}]

## Public functions

@doc """
Runs an Elasticsearch by returning a `Stream` which is perfect for browsing large volume of data.
## Examples
iex> ElasticsearchEx.Stream.stream(%{query: %{match_all: %{}}, sort: [%{"@timestamp" => "desc"}]}, "my_index", keep_alive: "30s", per_page: 500)
#Function<52.124013645/2 in Stream.resource/3>
"""
@spec stream(query(), nil | index(), keyword()) :: Enumerable.t()
def stream(query, index \\ nil, params \\ []) do
{keep_alive, params} = Keyword.pop(params, :keep_alive, "10s")
{per_page, params} = Keyword.pop(params, :per_page, 100)
prepared_query = prepare_query(query, per_page)

Stream.resource(start_fun(index, keep_alive), next_fun(prepared_query, params), &after_fun/1)
end

## Private functions

@spec start_fun(index(), binary()) :: (() -> acc())
defp start_fun(index, keep_alive) do
fn ->
{:ok, %{"id" => pit_id}} = SearchApi.create_pit(index, keep_alive: keep_alive)

Logger.debug("Created the PIT: #{pit_id}")

{%{id: pit_id, keep_alive: keep_alive}, nil}
end
end

@spec next_fun(query(), keyword()) :: (acc() -> {[] | :halt, acc()})
defp next_fun(query, params) do
fn {pit, search_after} ->
query = query |> generate_pit_query(pit) |> generate_search_after_query(search_after)

Logger.debug(
"Searching through the PIT: #{pit.id} and search_after: #{inspect(search_after)}"
)

case SearchApi.search(query, nil, params) do
{:ok, %{"hits" => %{"hits" => []}}} ->
{:halt, {pit, search_after}}

{:ok, %{"hits" => %{"hits" => hits}}} ->
search_after = hits |> List.last() |> Map.fetch!("sort")

{hits, {pit, search_after}}

any ->
raise "unknown result: #{inspect(any)}"
end
end
end

@spec after_fun(acc()) :: any()
defp after_fun({%{id: pit_id}, _search_after}) do
case SearchApi.close_pit(pit_id) do
{:ok, %{"num_freed" => _, "succeeded" => true}} ->
Logger.debug("Deleted the PIT: #{pit_id}")

{:error, _error} ->
Logger.error("Unable to delete the PIT: #{pit_id}")
end
end

@spec prepare_query(query(), pos_integer()) :: query()
defp prepare_query(query, per_page) do
query
|> Map.put(:size, per_page)
|> Map.put(:track_total_hits, false)
|> Map.update(:sort, @sort_shard_doc, &(&1 ++ @sort_shard_doc))
end

@spec generate_pit_query(query(), pit()) :: query()
defp generate_pit_query(query, pit) when is_map(pit) do
Map.put(query, :pit, pit)
end

@spec generate_search_after_query(query(), nil | list()) :: query()
defp generate_search_after_query(query, search_after) when is_list(search_after) do
Map.put(query, :search_after, search_after)
end

defp generate_search_after_query(query, _search_after), do: query
end
3 changes: 1 addition & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule ElasticsearchEx.MixProject do
use Mix.Project

@source_url "https://github.com/CoreCareinc/elasticsearch_ex"
@version "1.2.0"
@version "1.3.0"

def project do
[
Expand Down Expand Up @@ -61,7 +61,6 @@ defmodule ElasticsearchEx.MixProject do
ElasticsearchEx.Api.Usage
],
Utils: [
ElasticsearchEx.Api.Utils,
ElasticsearchEx.Client,
ElasticsearchEx.Ndjson
]
Expand Down
86 changes: 86 additions & 0 deletions test/elasticsearch_ex/stream_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
defmodule ElasticsearchEx.StreamTest do
use ElasticsearchEx.ConnCase

alias ElasticsearchEx.Stream

## Module attributes

@index_name "test_stream"

@query %{query: %{match_all: %{}}}

## Tests

setup_all do
on_exit(fn -> delete_index(@index_name) end)
create_index(@index_name, %{message: %{type: :keyword}})
stream = Stream.stream(@query, @index_name, per_page: 1, keep_alive: "5s")

{:ok, doc_ids: index_documents(@index_name, 3), stream: stream}
end

describe "stream/2" do
test "returns a Stream", %{stream: stream} do
assert is_function(stream, 2)
end

@tag capture_log: true
test "runs the Stream", %{doc_ids: [doc_id1 | [doc_id2 | [doc_id3]]], stream: stream} do
assert [
%{
"_id" => ^doc_id1,
"_index" => @index_name,
"_score" => nil,
"_source" => %{"message" => "Hello World 1!"},
"sort" => [0]
},
%{
"_id" => ^doc_id2,
"_index" => @index_name,
"_score" => nil,
"_source" => %{"message" => "Hello World 2!"},
"sort" => [1]
},
%{
"_id" => ^doc_id3,
"_index" => @index_name,
"_score" => nil,
"_source" => %{"message" => "Hello World 3!"},
"sort" => [2]
}
] = Enum.to_list(stream)
end

@tag capture_log: true
test "runs the Stream with desc order", %{doc_ids: [doc_id1 | [doc_id2 | [doc_id3]]]} do
stream =
@query
|> Map.put(:sort, [%{message: :desc}])
|> Stream.stream(@index_name, per_page: 1, keep_alive: "5s")

assert [
%{
"_id" => ^doc_id3,
"_index" => @index_name,
"_score" => nil,
"_source" => %{"message" => "Hello World 3!"},
"sort" => ["Hello World 3!", 2]
},
%{
"_id" => ^doc_id2,
"_index" => @index_name,
"_score" => nil,
"_source" => %{"message" => "Hello World 2!"},
"sort" => ["Hello World 2!", 1]
},
%{
"_id" => ^doc_id1,
"_index" => @index_name,
"_score" => nil,
"_source" => %{"message" => "Hello World 1!"},
"sort" => ["Hello World 1!", 0]
}
] = Enum.to_list(stream)
end
end
end

0 comments on commit 7165191

Please sign in to comment.