Skip to content

Commit

Permalink
feat: Add Stream module (#40)
Browse files Browse the repository at this point in the history
* Added a module to stream Elasticsearch documents

* Updated CHANGELOG

* Exposed the stream function in the root module

* Fixed Dialyxir error
  • Loading branch information
GRoguelon committed Apr 3, 2024
1 parent 7165191 commit c50d6ae
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 4 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
* **New features:**
* Added `ElasticsearchEx.Stream` module to stream documents

* **Changes:**
* Removed extra `is_list(opts)` from the API because it's already checked in the HTTP client

* **Bug fixes:**
* Fixed a bug on `ElasticsearchEx.Api.Search.search/2` to allow `index` to be `nil`

## v1.2.0 (2024-04-04)

* **New features:**
Expand Down
7 changes: 7 additions & 0 deletions lib/elasticsearch_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,11 @@ defmodule ElasticsearchEx do
@spec index(source(), index(), nil | document_id(), keyword()) :: response()
defdelegate index(source, index, document_id \\ nil, opts \\ []),
to: ElasticsearchEx.Api.Document

@doc """
Refer to `ElasticsearchEx.Api.Search.search/2` documentation.
"""
@doc since: "1.3.0"
@spec stream(query(), nil | index(), keyword()) :: Enumerable.t()
defdelegate stream(query, index \\ nil, opts \\ []), to: ElasticsearchEx.Stream
end
14 changes: 10 additions & 4 deletions lib/elasticsearch_ex/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ defmodule ElasticsearchEx.Stream do

@type index :: ElasticsearchEx.index()

@type pit :: %{required(:id) => binary(), required(:keep_alive) => binary()}
@typep pit :: %{required(:id) => binary(), required(:keep_alive) => binary()}

@type search_after :: []
@typep search_after :: []

@type acc :: {pit(), nil | search_after()}
@typep acc :: {pit(), nil | search_after()}

## Module attributes

Expand All @@ -30,9 +30,15 @@ defmodule ElasticsearchEx.Stream do
## Examples
iex> ElasticsearchEx.Stream.stream(%{query: %{match_all: %{}}, sort: [%{"@timestamp" => "desc"}]}, "my_index", keep_alive: "30s", per_page: 500)
iex> ElasticsearchEx.Stream.stream(
...> %{query: %{match_all: %{}}, sort: [%{message: :desc}]},
...> :my_index,
...> keep_alive: "30s",
...> per_page: 500
...> )
#Function<52.124013645/2 in Stream.resource/3>
"""
@doc since: "1.3.0"
@spec stream(query(), nil | index(), keyword()) :: Enumerable.t()
def stream(query, index \\ nil, params \\ []) do
{keep_alive, params} = Keyword.pop(params, :keep_alive, "10s")
Expand Down
6 changes: 6 additions & 0 deletions test/elasticsearch_ex_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,10 @@ defmodule ElasticsearchExTest do
assert {:index, 4} in ElasticsearchEx.__info__(:functions)
end
end

describe "stream/3" do
test "exposes a function of arity of 3" do
assert {:stream, 3} in ElasticsearchEx.__info__(:functions)
end
end
end

0 comments on commit c50d6ae

Please sign in to comment.