From 148974af06071193afb0af769a3f38330b4f8f9c Mon Sep 17 00:00:00 2001 From: Daniel Berkompas Date: Wed, 11 Apr 2018 15:53:23 -0700 Subject: [PATCH 1/9] Introduce Elasticsearch.Cluster --- README.md | 43 ++-- bin/coverage | 8 + coveralls.json | 9 + lib/elasticsearch.ex | 235 ++++++++---------- lib/elasticsearch/api/api.ex | 22 +- lib/elasticsearch/api/http.ex | 86 +++++-- lib/elasticsearch/cluster/cluster.ex | 195 +++++++++++++++ lib/elasticsearch/cluster/config.ex | 86 +++++++ lib/elasticsearch/config.ex | 132 ---------- lib/elasticsearch/indexing/bulk.ex | 30 +-- lib/elasticsearch/indexing/index.ex | 127 +++++----- lib/elasticsearch/storage/data_stream.ex | 30 +-- lib/mix/elasticsearch.build.ex | 80 +++--- mix.exs | 6 +- mix.lock | 2 + test/elasticsearch/cluster/cluster_test.exs | 139 +++++++++++ test/elasticsearch/config_test.exs | 7 - test/elasticsearch/indexing/index_test.exs | 9 +- .../storage/data_stream_test.exs | 1 + test/elasticsearch_test.exs | 13 +- test/mix/elasticsearch.build_test.exs | 34 ++- test/support/cluster.ex | 25 ++ test/test_helper.exs | 3 +- 23 files changed, 845 insertions(+), 477 deletions(-) create mode 100755 bin/coverage create mode 100644 coveralls.json create mode 100644 lib/elasticsearch/cluster/cluster.ex create mode 100644 lib/elasticsearch/cluster/config.ex delete mode 100644 lib/elasticsearch/config.ex create mode 100644 test/elasticsearch/cluster/cluster_test.exs delete mode 100644 test/elasticsearch/config_test.exs create mode 100644 test/support/cluster.ex diff --git a/README.md b/README.md index 2fa5164..cc5c821 100644 --- a/README.md +++ b/README.md @@ -23,19 +23,27 @@ def deps do end ``` +Then, create an `Elasticsearch.Cluster` in your application: + +```elixir +defmodule MyApp.ElasticsearchCluster do + use Elasticsearch.Cluster, otp_app: :my_app +end +``` + ## Configuration See the annotated example configuration below. ```elixir -config :elasticsearch, +config :my_app, MyApp.ElasticsearchCluster, # The URL where Elasticsearch is hosted on your system - url: "http://localhost:9200", # or {:system, "ELASTICSEARCH_URL"} + url: "http://localhost:9200", # If your Elasticsearch cluster uses HTTP basic authentication, # specify the username and password here: - username: "username", # or {:system, "ELASTICSEARCH_USERNAME"} - password: "password", # or {:system, "ELASTICSEARCH_PASSWORD"} + username: "username", + password: "password", # When indexing data using the `mix elasticsearch.build` task, # control the data ingestion rate by raising or lowering the number @@ -49,7 +57,7 @@ config :elasticsearch, # If you want to mock the responses of the Elasticsearch JSON API # for testing or other purposes, you can inject a different module # here. It must implement the Elasticsearch.API behaviour. - api_module: Elasticsearch.API.HTTP, + api: Elasticsearch.API.HTTP, # Customize the library used for JSON encoding/decoding. json_library: Poison, # or Jason @@ -83,7 +91,7 @@ config :elasticsearch, } ``` -## Protocols & Behaviours +## Protocols and Behaviours #### Elasticsearch.Store @@ -130,8 +138,8 @@ This can be used in test mode, for example: ```elixir # config/test.exs -config :elasticsearch, - api_module: MyApp.ElasticsearchMock +config :my_app, MyApp.ElasticsearchCluster, + api: MyApp.ElasticsearchMock ``` Your mock can then stub requests and responses from Elasticsearch. @@ -140,7 +148,8 @@ Your mock can then stub requests and responses from Elasticsearch. defmodule MyApp.ElasticsearchMock do @behaviour Elasticsearch.API - def get("/posts/1", _headers, _opts) do + @impl true + def request(_config, :get, "/posts/1", _headers, _opts) do {:ok, %HTTPoison.Response{ status_code: 404, body: %{ @@ -161,7 +170,7 @@ hot-swap technique with Elasticsearch aliases. ```bash # This will read the `indexes[posts]` configuration seen above, to build # an index, `posts-123123123`, which will then be aliased to `posts`. -$ mix elasticsearch.build posts +$ mix elasticsearch.build posts --cluster MyApp.ElasticsearchCluster ``` See the docs on `Mix.Tasks.Elasticsearch.Build` and `Elasticsearch.Index` @@ -169,29 +178,29 @@ for more details. #### Individual Documents -Use `Elasticsearch.put_document/2` to upload a document to a particular index. +Use `Elasticsearch.put_document/3` to upload a document to a particular index. ```elixir # MyApp.Post must implement Elasticsearch.Document -Elasticsearch.put_document(%MyApp.Post{}, "index-name") +Elasticsearch.put_document(MyApp.ElasticsearchCluster, %MyApp.Post{}, "index-name") ``` -To remove documents, use `Elasticsearch.delete_document/2`: +To remove documents, use `Elasticsearch.delete_document/3`: ```elixir -Elasticsearch.delete_document(%MyApp.Post{}, "index-name") +Elasticsearch.delete_document(MyApp.ElasticsearchCluster, %MyApp.Post{}, "index-name") ``` ## Querying -You can query Elasticsearch the `post/2` function: +You can query Elasticsearch the `post/3` function: ```elixir # Raw query -Elasticsearch.post("/posts/post/_search", '{"query": {"match_all": {}}}') +Elasticsearch.post(MyApp.ElasticsearchCluster, "/posts/post/_search", '{"query": {"match_all": {}}}') # Using a map -Elasticsearch.post("/posts/post/_search", %{"query" => %{"match_all" => %{}}}) +Elasticsearch.post(MyApp.ElasticsearchCluster, "/posts/post/_search", %{"query" => %{"match_all" => %{}}}) ``` See the official Elasticsearch [documentation](https://www.elastic.co/guide/en/elasticsearch/reference/6.x/index.html) diff --git a/bin/coverage b/bin/coverage new file mode 100755 index 0000000..9096e9c --- /dev/null +++ b/bin/coverage @@ -0,0 +1,8 @@ +#!/usr/bin/env bash +# +# Coverage Report +# +# Generates and opens a code coverage report for the entire project. + +mix coveralls.html || { echo 'Tests failed!'; exit 1; } +open cover/excoveralls.html \ No newline at end of file diff --git a/coveralls.json b/coveralls.json new file mode 100644 index 0000000..0f97cf1 --- /dev/null +++ b/coveralls.json @@ -0,0 +1,9 @@ +{ + "skip_files": [ + "lib/elasticsearch/api/api.ex", + "test/support/*" + ], + "coverage_options": { + "treat_no_relevant_lines_as_covered": true + } +} \ No newline at end of file diff --git a/lib/elasticsearch.ex b/lib/elasticsearch.ex index 43fd0c2..ff841ec 100644 --- a/lib/elasticsearch.ex +++ b/lib/elasticsearch.ex @@ -1,28 +1,23 @@ defmodule Elasticsearch do @moduledoc """ - An Elixir interface to the Elasticsearch JSON API. + Entry-point for interacting with your Elasticsearch cluster(s). - ## Configuration - - You can customize the API module used by this module to make requests to - the Elasticsearch API. (Default: `Elasticsearch.API.HTTP`) - - config :elasticsearch, - api_module: MyApp.CustomAPI - - You can also specify default headers or default options to pass to - `HTTPoison`. - - config :elasticsearch, - default_headers: [{"authorization", "custom-value"}], - default_options: [ssl: [{:versions, [:'tlsv1.2']}]] + You should configure at least one `Elasticsearch.Cluster` in order to + use the functions in this module, or else you'll need to pass all the + configuration for the cluster into each function call. """ - alias Elasticsearch.Document + alias Elasticsearch.{ + Document, + Cluster, + Cluster.Config + } - @type response :: - {:ok, map} - | {:error, Elasticsearch.Exception.t()} + @type index_name :: String.t() + @type url :: Path.t() + @type opts :: Keyword.t() + @type data :: map | String.t() + @type response :: {:ok, map} :: {:error, Elasticsearch.Exception.t()} @doc """ Creates or updates a document in a given index. @@ -31,30 +26,26 @@ defmodule Elasticsearch do ## Example - iex> Elasticsearch.Index.create_from_file("posts-1", "test/support/settings/posts.json") + iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json") ...> struct = %Post{id: 123, title: "Post", author: "Author"} - ...> Elasticsearch.put_document(struct, "posts-1") + ...> Elasticsearch.put_document(Cluster, struct, "posts-1") {:ok, %{"_id" => "123", "_index" => "posts-1", "_shards" => %{"failed" => 0, "successful" => 1, "total" => 2}, "_type" => "post", "_version" => 1, "created" => true, "result" => "created"}} """ - @spec put_document(Document.t(), String.t()) :: response - def put_document(document, index) do - document - |> document_url(index) - |> put(Document.encode(document)) + @spec put_document(Cluster.t(), Document.t(), index_name) :: response + def put_document(cluster, document, index) do + put(cluster, document_url(document, index), Document.encode(document)) end @doc """ Same as `put_document/2`, but raises on errors. """ - @spec put_document!(Document.t(), String.t()) :: map - def put_document!(document, index) do - document - |> put_document(index) - |> unwrap!() + @spec put_document(Cluster.t(), Document.t(), index_name) :: map | no_return + def put_document!(cluster, document, index) do + put!(cluster, document_url(document, index), Document.encode(document)) end @doc """ @@ -62,21 +53,17 @@ defmodule Elasticsearch do The document must implement the `Elasticsearch.Document` protocol. """ - @spec delete_document(Document.t(), String.t()) :: response - def delete_document(document, index) do - document - |> document_url(index) - |> delete() + @spec delete_document(Cluster.t(), Document.t(), index_name) :: response + def delete_document(cluster, document, index) do + delete(cluster, document_url(document, index)) end @doc """ Same as `delete_document/2`, but raises on errors. """ - @spec delete_document!(Document.t(), String.t()) :: map - def delete_document!(document, index) do - document - |> delete_document(index) - |> unwrap!() + @spec delete_document!(Cluster.t(), Document.t(), index_name) :: map | no_return + def delete_document!(cluster, document, index) do + delete!(cluster, document_url(document, index)) end defp document_url(document, index) do @@ -84,23 +71,17 @@ defmodule Elasticsearch do end @doc """ - Waits for Elasticsearch to be available at the configured url. + Waits for a given Elasticsearch cluster to be available. It will try a given number of times, with 1sec delay between tries. - - ## Example - - iex> {:ok, resp} = Elasticsearch.wait_for_boot(15) - ...> is_list(resp) - true """ - @spec wait_for_boot(integer) :: + @spec wait_for_boot(Cluster.t(), integer) :: {:ok, map} | {:error, RuntimeError.t()} | {:error, Elasticsearch.Exception.t()} - def wait_for_boot(tries, count \\ 0) + def wait_for_boot(cluster, tries, count \\ 0) - def wait_for_boot(tries, count) when count == tries do + def wait_for_boot(_cluster, tries, count) when count == tries do { :error, RuntimeError.exception(""" @@ -109,10 +90,10 @@ defmodule Elasticsearch do } end - def wait_for_boot(tries, count) do - with {:error, _} <- get("/_cat/health?format=json") do + def wait_for_boot(cluster, tries, count) do + with {:error, _} <- get(cluster, "/_cat/health?format=json") do :timer.sleep(1000) - wait_for_boot(tries, count + 1) + wait_for_boot(cluster, tries, count + 1) end end @@ -121,11 +102,11 @@ defmodule Elasticsearch do ## Examples - iex> {:ok, resp} = Elasticsearch.get("/_cat/health?format=json") + iex> {:ok, resp} = Elasticsearch.get(Cluster, "/_cat/health?format=json") ...> is_list(resp) true - iex> Elasticsearch.get("/nonexistent") + iex> Elasticsearch.get(Cluster, "/nonexistent") {:error, %Elasticsearch.Exception{col: nil, line: nil, message: "no such index", query: nil, @@ -141,11 +122,13 @@ defmodule Elasticsearch do "type" => "index_not_found_exception"}, "status" => 404}, status: 404, type: "index_not_found_exception"}} """ - @spec get(String.t()) :: response - @spec get(String.t(), Keyword.t()) :: response - def get(url, opts \\ []) do - url - |> api_module().get(default_headers(), Keyword.merge(default_opts(), opts)) + @spec get(Cluster.t(), url) :: response + @spec get(Cluster.t(), url, opts) :: response + def get(cluster, url, opts \\ []) do + config = Config.get(cluster) + + config + |> config.api.request(:get, url, "", opts) |> format() end @@ -155,18 +138,18 @@ defmodule Elasticsearch do ## Examples - iex> resp = Elasticsearch.get!("/_cat/health?format=json") + iex> resp = Elasticsearch.get!(Cluster, "/_cat/health?format=json") ...> is_list(resp) true - iex> Elasticsearch.get!("/nonexistent") + iex> Elasticsearch.get!(Cluster, "/nonexistent") ** (Elasticsearch.Exception) (index_not_found_exception) no such index """ - @spec get!(String.t()) :: map - @spec get!(String.t(), Keyword.t()) :: map - def get!(url, opts \\ []) do - url - |> get(opts) + @spec get!(Cluster.t(), url) :: map | no_return + @spec get!(Cluster.t(), url, opts) :: map | no_return + def get!(cluster, url, opts \\ []) do + cluster + |> get(url, opts) |> unwrap!() end @@ -175,25 +158,27 @@ defmodule Elasticsearch do ## Examples - iex> Elasticsearch.Index.create_from_file("posts-1", "test/support/settings/posts.json") - ...> Elasticsearch.put("/posts-1/post/id", %{"title" => "title", "author" => "author"}) + iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json") + ...> Elasticsearch.put(Cluster, "/posts-1/post/id", %{"title" => "title", "author" => "author"}) {:ok, %{"_id" => "id", "_index" => "posts-1", "_shards" => %{"failed" => 0, "successful" => 1, "total" => 2}, "_type" => "post", "_version" => 1, "created" => true, "result" => "created"}} - iex> Elasticsearch.put("/bad/url", %{"title" => "title", "author" => "author"}) + iex> Elasticsearch.put(Cluster, "/bad/url", %{"title" => "title", "author" => "author"}) {:error, %Elasticsearch.Exception{col: nil, line: nil, message: "No handler found for uri [/bad/url] and method [PUT]", query: nil, raw: nil, status: nil, type: nil}} """ - @spec put(String.t(), map | binary) :: response - @spec put(String.t(), map | binary, Keyword.t()) :: response - def put(url, data, opts \\ []) do - url - |> api_module().put(data, default_headers(), Keyword.merge(default_opts(), opts)) + @spec put(Cluster.t(), url, data) :: response + @spec put(Cluster.t(), url, data, opts) :: response + def put(cluster, url, data, opts \\ []) do + config = Config.get(cluster) + + config + |> config.api.request(:put, url, data, opts) |> format() end @@ -203,21 +188,21 @@ defmodule Elasticsearch do ## Examples - iex> Elasticsearch.Index.create_from_file("posts", "test/support/settings/posts.json") - ...> Elasticsearch.put!("/posts/post/id", %{"name" => "name", "author" => "author"}) + iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json") + ...> Elasticsearch.put!(Cluster, "/posts/post/id", %{"name" => "name", "author" => "author"}) %{"_id" => "id", "_index" => "posts", "_shards" => %{"failed" => 0, "successful" => 1, "total" => 2}, "_type" => "post", "_version" => 1, "created" => true, "result" => "created"} - iex> Elasticsearch.put!("/bad/url", %{"data" => "here"}) + iex> Elasticsearch.put!(Cluster, "/bad/url", %{"data" => "here"}) ** (Elasticsearch.Exception) No handler found for uri [/bad/url] and method [PUT] """ - @spec put!(String.t(), map) :: map - @spec put!(String.t(), map, Keyword.t()) :: map - def put!(url, data, opts \\ []) do - url - |> put(data, opts) + @spec put!(Cluster.t(), url, data) :: map | no_return + @spec put!(Cluster.t(), url, data, opts) :: map | no_return + def put!(cluster, url, data, opts \\ []) do + cluster + |> put(url, data, opts) |> unwrap!() end @@ -227,17 +212,19 @@ defmodule Elasticsearch do ## Examples - iex> Elasticsearch.Index.create_from_file("posts", "test/support/settings/posts.json") + iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json") ...> query = %{"query" => %{"match_all" => %{}}} - ...> {:ok, resp} = Elasticsearch.post("/posts/_search", query) + ...> {:ok, resp} = Elasticsearch.post(Cluster, "/posts/_search", query) ...> resp["hits"]["hits"] [] """ - @spec post(String.t(), map) :: response - @spec post(String.t(), map, Keyword.t()) :: response - def post(url, data, opts \\ []) do - url - |> api_module().post(data, default_headers(), Keyword.merge(default_opts(), opts)) + @spec post(Cluster.t(), url, data) :: response + @spec post(Cluster.t(), url, data, opts) :: response + def post(cluster, url, data, opts \\ []) do + config = Config.get(cluster) + + config + |> config.api.request(:post, url, data, opts) |> format() end @@ -246,23 +233,23 @@ defmodule Elasticsearch do ## Examples - iex> Elasticsearch.Index.create_from_file("posts", "test/support/settings/posts.json") + iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json") ...> query = %{"query" => %{"match_all" => %{}}} - ...> resp = Elasticsearch.post!("/posts/_search", query) + ...> resp = Elasticsearch.post!(Cluster, "/posts/_search", query) ...> is_map(resp) true Raises an error if the path is invalid or another error occurs: iex> query = %{"query" => %{"match_all" => %{}}} - ...> Elasticsearch.post!("/nonexistent/_search", query) + ...> Elasticsearch.post!(Cluster, "/nonexistent/_search", query) ** (Elasticsearch.Exception) (index_not_found_exception) no such index """ - @spec post!(String.t(), map) :: map - @spec post!(String.t(), map, Keyword.t()) :: map - def post!(url, data, opts \\ []) do - url - |> post(data, opts) + @spec post!(Cluster.t(), url, data) :: map | no_return + @spec post!(Cluster.t(), url, data, opts) :: map | no_return + def post!(cluster, url, data, opts \\ []) do + cluster + |> post(url, data, opts) |> unwrap!() end @@ -271,13 +258,13 @@ defmodule Elasticsearch do ## Examples - iex> Elasticsearch.Index.create_from_file("posts", "test/support/settings/posts.json") - ...> Elasticsearch.delete("/posts") + iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json") + ...> Elasticsearch.delete(Cluster, "/posts") {:ok, %{"acknowledged" => true}} It returns an error if the given resource does not exist. - iex> Elasticsearch.delete("/nonexistent") + iex> Elasticsearch.delete(Cluster, "/nonexistent") {:error, %Elasticsearch.Exception{col: nil, line: nil, message: "no such index", query: nil, @@ -293,10 +280,14 @@ defmodule Elasticsearch do "type" => "index_not_found_exception"}, "status" => 404}, status: 404, type: "index_not_found_exception"}} """ - @spec delete(String.t()) :: response - @spec delete(String.t(), Keyword.t()) :: response - def delete(url, opts \\ []) do - format(api_module().delete(url, default_headers(), Keyword.merge(default_opts(), opts))) + @spec delete(Cluster.t(), url) :: response + @spec delete(Cluster.t(), url, opts) :: response + def delete(cluster, url, opts \\ []) do + config = Config.get(cluster) + + config + |> config.api.request(:delete, url, "", opts) + |> format() end @doc """ @@ -304,20 +295,20 @@ defmodule Elasticsearch do ## Examples - iex> Elasticsearch.Index.create_from_file("posts", "test/support/settings/posts.json") - ...> Elasticsearch.delete!("/posts") + iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json") + ...> Elasticsearch.delete!(Cluster, "/posts") %{"acknowledged" => true} Raises an error if the resource is invalid. - iex> Elasticsearch.delete!("/nonexistent") + iex> Elasticsearch.delete!(Cluster, "/nonexistent") ** (Elasticsearch.Exception) (index_not_found_exception) no such index """ - @spec delete!(String.t()) :: map - @spec delete!(String.t(), Keyword.t()) :: map - def delete!(url, opts \\ []) do - url - |> delete(opts) + @spec delete!(Cluster.t(), url) :: map | no_return + @spec delete!(Cluster.t(), url, opts) :: map | no_return + def delete!(cluster, url, opts \\ []) do + cluster + |> delete(url, opts) |> unwrap!() end @@ -335,20 +326,4 @@ defmodule Elasticsearch do defp unwrap!({:ok, value}), do: value defp unwrap!({:error, exception}), do: raise(exception) - - defp api_module do - config()[:api_module] || Elasticsearch.API.HTTP - end - - defp default_opts do - Application.get_env(:elasticsearch, :default_opts, []) - end - - defp default_headers do - Application.get_env(:elasticsearch, :default_headers, []) - end - - defp config do - Application.get_all_env(:elasticsearch) - end end diff --git a/lib/elasticsearch/api/api.ex b/lib/elasticsearch/api/api.ex index 621dccb..55cd2ce 100644 --- a/lib/elasticsearch/api/api.ex +++ b/lib/elasticsearch/api/api.ex @@ -1,20 +1,26 @@ defmodule Elasticsearch.API do @moduledoc """ - Defines the necessary callbacks for integrating with the Elasticsearch - JSON API. + Behaviour for interacting with the Elasticsearch JSON API. """ + @typedoc "An HTTP method" + @type method :: :get | :put | :post | :delete + + @typedoc "The URL to request from the API" @type url :: String.t() - @type data :: map | Keyword.t() + + @typedoc "A payload of data to send, relevant to :put and :post requests" + @type data :: binary | map | Keyword.t() + + @typedoc "A keyword list of options to pass to HTTPoison/Hackney" @type opts :: Keyword.t() - @type headers :: Keyword.t() @type response :: {:ok, HTTPoison.Response.t() | HTTPoison.AsyncResponse.t()} | {:error, HTTPoison.Error.t()} - @callback get(url, headers, opts) :: response - @callback put(url, data, headers, opts) :: response - @callback post(url, data, headers, opts) :: response - @callback delete(url, headers, opts) :: response + @doc """ + Makes a request to an Elasticsearch JSON API URl using the given method. + """ + @callback request(config :: Elasticsearch.Cluster.config(), method, url, data, opts) :: response end diff --git a/lib/elasticsearch/api/http.ex b/lib/elasticsearch/api/http.ex index ae3430f..dc3e488 100644 --- a/lib/elasticsearch/api/http.ex +++ b/lib/elasticsearch/api/http.ex @@ -1,27 +1,67 @@ defmodule Elasticsearch.API.HTTP do @moduledoc """ - An HTTP implementation of `Elasticsearch.API`, using `HTTPoison`. + A "real" HTTP implementation of `Elasticsearch.API`. """ @behaviour Elasticsearch.API - use HTTPoison.Base + @impl true + def request(config, method, url, data, opts) do + method + |> HTTPoison.request( + process_url(url, config), + process_request_body(data, config), + headers(config), + opts ++ Map.get(config, :default_opts, []) + ) + |> process_response(config) + end - alias Elasticsearch.Config + # Respect absolute URLs if passed + defp process_url("http" <> _rest = url, _config) do + url + end - ### - # HTTPoison Callbacks - ### + # On relative urls, prepend the configured base URL + defp process_url(url, config) do + Path.join(config.url, url) + end - @doc false - def process_url(url) do - Config.url() <> url + # Converts the request body into JSON, unless it has already + # been converted + defp process_request_body(data, _config) when is_binary(data) do + data end - def process_request_headers(_headers) do - headers = [{"Content-Type", "application/json"}] + defp process_request_body(data, config) when is_map(data) do + json_library(config).encode!(data) + end - credentials = Config.http_basic_credentials() + # Converts the response body string from JSON into a map, if it looks like it + # is actually JSON + defp process_response({:ok, %{body: body} = response}, config) do + body = + cond do + json?(body) -> json_library(config).decode!(body) + true -> body + end + + {:ok, %{response | body: body}} + end + + defp process_response(response, _config) do + response + end + + defp json?(str) when is_binary(str) do + str =~ ~r/^\{/ || str =~ ~r/^\[/ + end + + # Produces request headers for the request, based on the configuration + defp headers(config) do + headers = [{"Content-Type", "application/json"}] ++ Map.get(config, :default_headers, []) + + credentials = http_basic_credentials(config) if credentials do [{"Authorization", "Basic #{credentials}"} | headers] @@ -30,23 +70,19 @@ defmodule Elasticsearch.API.HTTP do end end - @doc false - def process_request_body(string) when is_binary(string), do: string + defp http_basic_credentials(%{username: username, password: password}) do + Base.encode64("#{username}:#{password}") + end - def process_request_body(map) when is_map(map) do - Config.json_library().encode!(map) + defp http_basic_credentials(_config) do + nil end - @doc false - def process_response_body(body) do - if json?(body) do - Config.json_library().decode!(body) - else - body - end + defp json_library(%{json_library: json_library}) do + json_library end - defp json?(str) do - str =~ ~r/^\{/ || str =~ ~r/^\[/ + defp json_library(_config) do + Poison end end diff --git a/lib/elasticsearch/cluster/cluster.ex b/lib/elasticsearch/cluster/cluster.ex new file mode 100644 index 0000000..e0fff09 --- /dev/null +++ b/lib/elasticsearch/cluster/cluster.ex @@ -0,0 +1,195 @@ +defmodule Elasticsearch.Cluster do + @moduledoc """ + Defines and holds configuration for your Elasticsearch cluster. + + defmodule MyApp.ElasticsearchCluster do + use Elasticsearch.Cluster + end + + Once you have created your cluster, add it to your application's supervision tree: + + children = [ + MyApp.ElasticsearchCluster + ] + + Finally, you can issue requests to Elasticsearch using it. + + Elasticsearch.get(MyApp.ElasticsearchCluster, "/_cat/health") + + ## Configuration + + Clusters can be configured in several ways. + + #### Via Mix + + Clusters can read configuration from the mix config, if you pass the + `:otp_app` option: + + defmodule MyApp.ElasticsearchCluster do + use Elasticsearch.Cluster, otp_app: :my_app + end + + # In your config/config.exs... + config :my_app, MyApp.ElasticsearchCluster, + url: "http://localhost:9200", + # ... + + #### Via `init/1` + + When a cluster starts, you can override its configuration via the `init/1` + callback. This is a good place to read from environment variables. + + defmodule MyApp.ElasticsearchCluster do + use Elasticsearch.Cluster + + def init(config) do + config = + config + |> Map.put(:url, System.get_env("ELASTICSEARCH_URL")) + # ... + + {:ok, config} + end + end + + #### Via `start_link/1` + + You can also pass configuration into the cluster directly when you start it + with `start_link/1`. + + MyApp.Elasticsearch.start_link(url: "http://localhost:9200", ...) + + ### Configuration Options + + The following options are available for configuration. + + * `:url` - The URL at which the Elasticsearch cluster is available. + + * `:api` - The API module to use to communicate with Elasticsearch. Must implement the + `Elasticsearch.API` behaviour. + + * `:bulk_page_size` - When creating indexes via bulk upload, how many documents to include + per request. + + * `:bulk_wait_interval` - The number of milliseconds to wait between bulk upload requests. + + * `:indexes` - A map of indexes. Used by `mix elasticsearch.build` to build indexes. + * `:settings`: The file path of the JSON settings for the index. + * `:store`: An `Elasticsearch.Store` module to use to load data for the index. + * `:sources`: A list of sources you want to load for this index. + + * `:json_library` (Optional) - The JSON library to use. (E.g. `Poison` or `Jason`) + + * `:username` (Optional) - The HTTP Basic username for the Elasticsearch endpoint, if any. + + * `:password` (Optional) - The HTTP Basic password for the Elasticsearch endpoint, if any. + + * `:default_headers` (Optional) - A list of default headers to send with the each request. + + * `:default_options` (Optional) - A list of default HTTPoison/Hackney options to send with + each request. + + ### Configuration Example + + %{ + api: Elasticsearch.API.HTTP, + bulk_page_size: 5000, + bulk_wait_interval: 5000, + json_library: Poison, + url: "http://localhost:9200", + username: "username", + password: "password", + default_headers: [{"authorization", "custom-value"}], + default_opts: [ssl: [{:versions, [:'tlsv1.2']}], + indexes: %{ + posts: %{ + settings: "priv/elasticsearch/posts.json", + store: MyApp.ElasticsearchStore, + sources: [MyApp.Post] + } + } + } + """ + + alias Elasticsearch.Cluster.Config + + @typedoc """ + Defines valid configuration for a cluster. + """ + @type config :: %{ + :url => String.t(), + :api => module, + :bulk_page_size => integer, + :bulk_wait_interval => integer, + optional(:json_library) => module, + optional(:username) => String.t(), + optional(:password) => String.t(), + optional(:default_headers) => [{String.t(), String.t()}], + optional(:default_options) => Keyword.t(), + optional(:indexes) => %{ + optional(atom) => %{ + settings: Path.t(), + store: module, + sources: [module] + } + } + } + + @typedoc """ + A cluster is either a module defined with `Elasticsearch.Cluster`, or a + map that has all the required configuration keys. + """ + @type t :: module | config + + @doc false + defmacro __using__(opts) do + quote do + use GenServer + + alias Elasticsearch.Cluster.Config + + # Cache configuration into the state of the GenServer so that + # we aren't running potentially expensive logic to load configuration + # on each function call. + def start_link(config \\ []) do + config = Config.build(unquote(opts[:otp_app]), __MODULE__, config) + + # Ensure that the configuration is validated on startup + with {:ok, pid} <- GenServer.start_link(__MODULE__, config, name: __MODULE__), + :ok <- GenServer.call(pid, :validate) do + {:ok, pid} + else + error -> + GenServer.stop(__MODULE__) + error + end + end + + @impl GenServer + def init(config), do: {:ok, config} + + @doc false + def __config__ do + GenServer.call(__MODULE__, :config) + end + + @impl GenServer + @doc false + def handle_call(:config, _from, config) do + {:reply, config, config} + end + + def handle_call(:validate, _from, config) do + case Config.validate(config) do + {:ok, _config} -> + {:reply, :ok, config} + + error -> + {:reply, error, config} + end + end + + defoverridable init: 1 + end + end +end diff --git a/lib/elasticsearch/cluster/config.ex b/lib/elasticsearch/cluster/config.ex new file mode 100644 index 0000000..c450eb0 --- /dev/null +++ b/lib/elasticsearch/cluster/config.ex @@ -0,0 +1,86 @@ +defmodule Elasticsearch.Cluster.Config do + @moduledoc false + + def get(cluster) when is_atom(cluster) do + cluster.__config__() + end + + def get(config) when is_map(config) or is_list(config) do + Enum.into(config, %{}) + end + + @doc false + def build(nil, config) do + Enum.into(config, %{}) + end + + def build(otp_app, module, config) do + config = Enum.into(config, %{}) + + from_app = + otp_app + |> Application.get_env(module, []) + |> Enum.into(%{}) + + Map.merge(from_app, config) + end + + @doc false + def validate(config) do + with {:ok, config} <- + Vex.validate( + config, + url: &(is_binary(&1) && String.starts_with?(&1, "http")), + username: [presence: [unless: &(&1[:password] == nil)]], + password: [presence: [unless: &(&1[:username] == nil)]], + api: [presence: true, by: &is_module/1], + json_library: [by: &(is_nil(&1) || is_module(&1))], + bulk_page_size: [presence: true, by: &is_integer/1], + bulk_wait_interval: [presence: true, by: &is_integer/1] + ), + :ok <- validate_indexes(config[:indexes] || %{}) do + {:ok, config} + else + {:error, errors} -> + {:error, validation_errors(errors)} + end + end + + defp is_module(module) do + is_atom(module) && Code.ensure_loaded?(module) + end + + defp validation_errors(errors) do + errors + |> Enum.map(&Tuple.delete_at(&1, 0)) + |> Enum.group_by(&elem(&1, 0), fn {_field, validation, message} -> + {message, validation: validation} + end) + end + + defp validate_indexes(indexes) do + invalid = + indexes + |> Enum.map(&validate_index/1) + |> Enum.reject(&match?({:ok, _}, &1)) + |> Enum.map(&elem(&1, 1)) + + if length(invalid) == 0 do + :ok + else + {:error, List.flatten(invalid)} + end + end + + defp validate_index({_name, settings}) do + Vex.validate( + settings, + settings: [presence: true, by: &is_binary/1], + store: [presence: true, by: &is_module/1], + sources: [ + presence: true, + by: &(is_list(&1) && Enum.map(&1, fn source -> is_atom(source) end)) + ] + ) + end +end diff --git a/lib/elasticsearch/config.ex b/lib/elasticsearch/config.ex deleted file mode 100644 index d54add3..0000000 --- a/lib/elasticsearch/config.ex +++ /dev/null @@ -1,132 +0,0 @@ -defmodule Elasticsearch.Config do - @moduledoc """ - Convenience functions for fetching configuration values for `Elasticsearch`. - """ - - alias Elasticsearch.Store - - @doc """ - Returns the configured Elasticsearch URL. - - ## Configuration - - config :elasticsearch, - url: "http://localhost:9200" - - System tuples are also supported: - - config :elasticsearch, - url: {:system, "ELASTICSEARCH_URL"} - - ## Example - - iex> Config.url() - "http://localhost:9200" - """ - @spec url :: String.t() - def url do - from_env(:elasticsearch, :url) - end - - @doc """ - Returns HTTP basic credential header contents based on the configured - `:username` and `:password`. - - ## Configuration - - config :elasticsearch, - username: "username", - password: "password" - - System tuples are also supported: - - config :elasticsearch, - username: {:system, "ELASTICSEARCH_USERNAME"}, - password: {:system, "ELASTICSEARCH_PASSWORD"} - - ## Example - - iex> Config.http_basic_credentials() - "dXNlcm5hbWU6cGFzc3dvcmQ=" - """ - @spec http_basic_credentials :: String.t() | nil - def http_basic_credentials do - username = from_env(:elasticsearch, :username) - password = from_env(:elasticsearch, :password) - - if username && password do - Base.encode64("#{username}:#{password}") - end - end - - @doc """ - Gets the full configuration for a given index. - - ## Configuration - - config :elasticsearch, - indexes: %{ - posts: %{ - settings: "test/support/settings/posts.json", - store: Elasticsearch.Test.Store, - sources: [Post] - } - } - - ## Example - - iex> Config.config_for_index(:posts) - %{ - settings: "test/support/settings/posts.json", - store: Elasticsearch.Test.Store, - sources: [Post] - } - """ - @spec config_for_index(atom) :: - %{ - settings: String.t(), - store: Store.t(), - sources: [Store.source()] - } - | nil - def config_for_index(index) do - all()[:indexes][index] - end - - @doc """ - Returns all configuration values for `Elasticsearch`. - """ - @spec all :: Keyword.t() - def all do - Application.get_all_env(:elasticsearch) - end - - @doc """ - Returns the JSON library to use for encoding/decoding. - Default: `Poison` - - ## Configuration - - config :elasticsearch, json_library: Jason - """ - @spec json_library :: module - def json_library do - Application.get_env(:elasticsearch, :json_library) || Poison - end - - @doc """ - A light wrapper around `Application.get_env/2`, providing automatic support for - `{:system, "VAR"}` tuples. - """ - @spec from_env(atom, atom, any) :: any - def from_env(otp_app, key, default \\ nil) - - def from_env(otp_app, key, default) do - otp_app - |> Application.get_env(key, default) - |> read_from_system(default) - end - - defp read_from_system({:system, env}, default), do: System.get_env(env) || default - defp read_from_system(value, _default), do: value -end diff --git a/lib/elasticsearch/indexing/bulk.ex b/lib/elasticsearch/indexing/bulk.ex index 4d1432f..738f88c 100644 --- a/lib/elasticsearch/indexing/bulk.ex +++ b/lib/elasticsearch/indexing/bulk.ex @@ -4,6 +4,7 @@ defmodule Elasticsearch.Index.Bulk do """ alias Elasticsearch.{ + Cluster, DataStream, Document } @@ -47,7 +48,7 @@ defmodule Elasticsearch.Index.Bulk do {"create":{"_type":"post","_index":"my-index","_id":"my-id"}} {"title":null,"author":null} \"\"\" - + iex> Bulk.encode!(123, "my-index") ** (Protocol.UndefinedError) protocol Elasticsearch.Document not implemented for 123. This protocol is implemented for: Post """ @@ -66,21 +67,24 @@ defmodule Elasticsearch.Index.Bulk do Uploads all the data from the list of `sources` to the given index. Data for each `source` will be fetched using the configured `:store`. """ - @spec upload(String.t(), Elasticsearch.Store.t(), list) :: :ok | {:error, [map]} - def upload(index_name, store, sources, errors \\ []) - def upload(_index_name, _store, [], []), do: :ok - def upload(_index_name, _store, [], errors), do: {:error, errors} + @spec upload(Cluster.t(), index_name :: String.t(), Elasticsearch.Store.t(), list) :: + :ok | {:error, [map]} + def upload(cluster, index_name, store, sources, errors \\ []) + def upload(_cluster, _index_name, _store, [], []), do: :ok + def upload(_cluster, _index_name, _store, [], errors), do: {:error, errors} + + def upload(cluster, index_name, store, [source | tail] = _sources, errors) do + config = Cluster.Config.get(cluster) - def upload(index_name, store, [source | tail] = _sources, errors) do errors = - source - |> DataStream.stream(store) + config + |> DataStream.stream(source, store) |> Stream.map(&encode!(&1, index_name)) - |> Stream.chunk_every(config()[:bulk_page_size]) - |> Stream.map(&Elasticsearch.put("/#{index_name}/_bulk", Enum.join(&1))) + |> Stream.chunk_every(config.bulk_page_size) + |> Stream.map(&Elasticsearch.put(cluster, "/#{index_name}/_bulk", Enum.join(&1))) |> Enum.reduce(errors, &collect_errors/2) - upload(index_name, tail, errors) + upload(cluster, index_name, tail, errors) end defp collect_errors({:ok, %{"errors" => true} = response}, errors) do @@ -125,8 +129,4 @@ defmodule Elasticsearch.Index.Bulk do header end end - - defp config do - Application.get_all_env(:elasticsearch) - end end diff --git a/lib/elasticsearch/indexing/index.ex b/lib/elasticsearch/indexing/index.ex index 10a73f5..a7af9ba 100644 --- a/lib/elasticsearch/indexing/index.ex +++ b/lib/elasticsearch/indexing/index.ex @@ -3,7 +3,10 @@ defmodule Elasticsearch.Index do Functions for manipulating Elasticsearch indexes. """ - alias Elasticsearch.Index.Bulk + alias Elasticsearch.{ + Cluster.Config, + Index.Bulk + } @doc """ Creates an index using a zero-downtime hot-swap technique. @@ -20,20 +23,27 @@ defmodule Elasticsearch.Index do iex> file = "test/support/settings/posts.json" ...> store = Elasticsearch.Test.Store - ...> Index.hot_swap("posts", file, store, [Post]) + ...> Index.hot_swap(Cluster, "posts", file, store, [Post]) :ok """ - @spec hot_swap(String.t() | atom, String.t(), Elasticsearch.Store.t(), list) :: + @spec hot_swap( + Cluster.t(), + alias :: String.t() | atom, + settings_path :: String.t(), + Elasticsearch.Store.t(), + list + ) :: :ok | {:error, Elasticsearch.Exception.t()} - def hot_swap(alias, settings_file, store, sources) do + def hot_swap(cluster, alias, settings_file, store, sources) do name = build_name(alias) + config = Config.get(cluster) - with :ok <- create_from_file(name, settings_file), - :ok <- Bulk.upload(name, store, sources), - :ok <- __MODULE__.alias(name, alias), - :ok <- clean_starting_with(alias, 2), - :ok <- refresh(name) do + with :ok <- create_from_file(config, name, settings_file), + :ok <- Bulk.upload(config, name, store, sources), + :ok <- __MODULE__.alias(config, name, alias), + :ok <- clean_starting_with(config, alias, 2), + :ok <- refresh(config, name) do :ok end end @@ -43,15 +53,15 @@ defmodule Elasticsearch.Index do ## Example - iex> Index.create_from_file("posts-1", "test/support/settings/posts.json") - ...> Index.starting_with("posts") + iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json") + ...> Index.starting_with(Cluster, "posts") {:ok, ["posts-1"]} """ - @spec starting_with(String.t() | atom) :: + @spec starting_with(Cluster.t(), String.t() | atom) :: {:ok, [String.t()]} | {:error, Elasticsearch.Exception.t()} - def starting_with(prefix) do - with {:ok, indexes} <- Elasticsearch.get("/_cat/indices?format=json") do + def starting_with(cluster, prefix) do + with {:ok, indexes} <- Elasticsearch.get(cluster, "/_cat/indices?format=json") do prefix = to_string(prefix) indexes = @@ -70,15 +80,16 @@ defmodule Elasticsearch.Index do ## Example - iex> Index.create_from_file("posts-1", "test/support/settings/posts.json") - ...> Index.alias("posts-1", "posts") + iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json") + ...> Index.alias(Cluster, "posts-1", "posts") :ok """ - @spec alias(String.t(), String.t()) :: + @spec alias(Cluster.t(), String.t(), String.t()) :: :ok | {:error, Elasticsearch.Exception.t()} - def alias(name, alias) do - with {:ok, indexes} <- starting_with(alias), indexes = Enum.reject(indexes, &(&1 == name)) do + def alias(cluster, name, alias) do + with {:ok, indexes} <- starting_with(cluster, alias), + indexes = Enum.reject(indexes, &(&1 == name)) do remove_actions = Enum.map(indexes, fn index -> %{"remove" => %{"index" => index, "alias" => alias}} @@ -88,7 +99,7 @@ defmodule Elasticsearch.Index do "actions" => remove_actions ++ [%{"add" => %{"index" => name, "alias" => alias}}] } - with {:ok, _response} <- Elasticsearch.post("/_aliases", actions), do: :ok + with {:ok, _response} <- Elasticsearch.post(cluster, "/_aliases", actions), do: :ok end end @@ -97,22 +108,22 @@ defmodule Elasticsearch.Index do ## Examples - iex> Index.create_from_file("posts-1", "test/support/settings/posts.json") - ...> Index.create_from_file("posts-2", "test/support/settings/posts.json") - ...> Index.latest_starting_with("posts") + iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json") + ...> Index.create_from_file(Cluster, "posts-2", "test/support/settings/posts.json") + ...> Index.latest_starting_with(Cluster, "posts") {:ok, "posts-2"} If there are no indexes matching that prefix: - iex> Index.latest_starting_with("nonexistent") + iex> Index.latest_starting_with(Cluster, "nonexistent") {:error, :not_found} """ - @spec latest_starting_with(String.t() | atom) :: + @spec latest_starting_with(Cluster.t(), String.t() | atom) :: {:ok, String.t()} | {:error, :not_found} | {:error, Elasticsearch.Exception.t()} - def latest_starting_with(prefix) do - with {:ok, indexes} <- starting_with(prefix) do + def latest_starting_with(cluster, prefix) do + with {:ok, indexes} <- starting_with(cluster, prefix) do index = indexes |> Enum.sort() @@ -130,14 +141,14 @@ defmodule Elasticsearch.Index do ## Example - iex> Index.create_from_file("posts-1", "test/support/settings/posts.json") - ...> Index.refresh("posts-1") + iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json") + ...> Index.refresh(Cluster, "posts-1") :ok """ - @spec refresh(String.t()) :: :ok | {:error, Elasticsearch.Exception.t()} - def refresh(name) do - with {:ok, _} <- Elasticsearch.post("/#{name}/_forcemerge?max_num_segments=5", %{}), - {:ok, _} <- Elasticsearch.post("/#{name}/_refresh", %{}), + @spec refresh(Cluster.t(), String.t()) :: :ok | {:error, Elasticsearch.Exception.t()} + def refresh(cluster, name) do + with {:ok, _} <- Elasticsearch.post(cluster, "/#{name}/_forcemerge?max_num_segments=5", %{}), + {:ok, _} <- Elasticsearch.post(cluster, "/#{name}/_refresh", %{}), do: :ok end @@ -146,16 +157,16 @@ defmodule Elasticsearch.Index do ## Examples - iex> Index.create_from_file("posts-1", "test/support/settings/posts.json") - ...> Index.refresh!("posts-1") + iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json") + ...> Index.refresh!(Cluster, "posts-1") :ok - iex> Index.refresh!("nonexistent") + iex> Index.refresh!(Cluster, "nonexistent") ** (Elasticsearch.Exception) (index_not_found_exception) no such index """ - @spec refresh!(String.t()) :: :ok - def refresh!(name) do - case refresh(name) do + @spec refresh!(Cluster.t(), String.t()) :: :ok + def refresh!(cluster, name) do + case refresh(cluster, name) do :ok -> :ok @@ -173,24 +184,24 @@ defmodule Elasticsearch.Index do If there is only one index, and `num_to_keep` is >= 1, the index is not deleted. - iex> Index.create_from_file("posts-1", "test/support/settings/posts.json") - ...> Index.clean_starting_with("posts", 1) - ...> Index.starting_with("posts") + iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json") + ...> Index.clean_starting_with(Cluster, "posts", 1) + ...> Index.starting_with(Cluster, "posts") {:ok, ["posts-1"]} If `num_to_keep` is less than the number of indexes, the older indexes are deleted. - iex> Index.create_from_file("posts-1", "test/support/settings/posts.json") - ...> Index.clean_starting_with("posts", 0) - ...> Index.starting_with("posts") + iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json") + ...> Index.clean_starting_with(Cluster, "posts", 0) + ...> Index.starting_with(Cluster, "posts") {:ok, []} """ - @spec clean_starting_with(String.t(), integer) :: + @spec clean_starting_with(Cluster.t(), String.t(), integer) :: :ok | {:error, [Elasticsearch.Exception.t()]} - def clean_starting_with(prefix, num_to_keep) when is_integer(num_to_keep) do - with {:ok, indexes} <- starting_with(prefix) do + def clean_starting_with(cluster, prefix, num_to_keep) when is_integer(num_to_keep) do + with {:ok, indexes} <- starting_with(cluster, prefix) do total = length(indexes) num_to_delete = total - num_to_keep num_to_delete = if num_to_delete >= 0, do: num_to_delete, else: 0 @@ -199,7 +210,7 @@ defmodule Elasticsearch.Index do indexes |> Enum.sort() |> Enum.take(num_to_delete) - |> Enum.map(&Elasticsearch.delete("/#{&1}")) + |> Enum.map(&Elasticsearch.delete(cluster, "/#{&1}")) |> Enum.filter(&(elem(&1, 0) == :error)) |> Enum.map(&elem(&1, 1)) @@ -216,14 +227,14 @@ defmodule Elasticsearch.Index do ## Examples - iex> Index.create("posts-1", "{}") + iex> Index.create(Cluster, "posts-1", "{}") :ok """ - @spec create(String.t(), map | String.t()) :: + @spec create(Cluster.t(), String.t(), map | String.t()) :: :ok | {:error, Elasticsearch.Exception.t()} - def create(name, settings) do - with {:ok, _response} <- Elasticsearch.put("/#{name}", settings), do: :ok + def create(cluster, name, settings) do + with {:ok, _response} <- Elasticsearch.put(cluster, "/#{name}", settings), do: :ok end @doc """ @@ -231,10 +242,10 @@ defmodule Elasticsearch.Index do ## Example - iex> Index.create_from_file("posts-1", "test/support/settings/posts.json") + iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json") :ok - iex> Index.create_from_file("posts-1", "nonexistent.json") + iex> Index.create_from_file(Cluster, "posts-1", "nonexistent.json") {:error, :enoent} The `posts.json` file contains regular index settings as described in the @@ -255,13 +266,13 @@ defmodule Elasticsearch.Index do } } """ - @spec create_from_file(String.t(), Path.t()) :: + @spec create_from_file(Cluster.t(), String.t(), Path.t()) :: :ok | {:error, File.posix()} | {:error, Elasticsearch.Exception.t()} - def create_from_file(name, file) do + def create_from_file(cluster, name, file) do with {:ok, settings} <- File.read(file) do - create(name, settings) + create(cluster, name, settings) end end diff --git a/lib/elasticsearch/storage/data_stream.ex b/lib/elasticsearch/storage/data_stream.ex index 1ad24fa..aa774e3 100644 --- a/lib/elasticsearch/storage/data_stream.ex +++ b/lib/elasticsearch/storage/data_stream.ex @@ -6,37 +6,29 @@ defmodule Elasticsearch.DataStream do @type source :: any - alias Elasticsearch.Config + alias Elasticsearch.Cluster @doc """ Creates a `Stream` from a given source. - ## Configuration - - Your configured `:store` module must handle the given data source. - The stream will be paginated based on the `:bulk_page_size` in the - configuration. - - config :elasticsearch, - bulk_page_size: 5000 - ## Example - iex> stream = DataStream.stream(MyApp.Schema, Elasticsearch.Test.Store) + iex> stream = DataStream.stream(Cluster, MyApp.Schema, Elasticsearch.Test.Store) ...> is_function(stream) true - + """ - @spec stream(source, Elasticsearch.Store.t()) :: Stream.t() - def stream(source, store) do - Stream.resource(&init/0, &next(&1, source, store), &finish/1) + @spec stream(Cluster.t(), source, Elasticsearch.Store.t()) :: Stream.t() + def stream(cluster, source, store) do + config = Cluster.Config.get(cluster) + Stream.resource(fn -> init(config) end, &next(&1, source, store), &finish/1) end # Store state in the following format: # # {items, offset, limit} - defp init do - {[], 0, Config.all()[:bulk_page_size]} + defp init(config) do + {[], 0, config.bulk_page_size} end # If no items, load another page of items @@ -52,8 +44,6 @@ defmodule Elasticsearch.DataStream do # Fetch a new page of items defp load_page(source, store, offset, limit) do - page_size = Config.all()[:bulk_page_size] - case store.load(source, offset, limit) do # If the load returns no more items (i.e., we've iterated through them # all) then halt the stream and leave offset and limit unchanged. @@ -64,7 +54,7 @@ defmodule Elasticsearch.DataStream do # tail into the state. Also, increment offset and limit by the # configured `:bulk_page_size`. [h | t] -> - {[h], {t, offset + page_size, limit}} + {[h], {t, offset + limit, limit}} end end diff --git a/lib/mix/elasticsearch.build.ex b/lib/mix/elasticsearch.build.ex index 3277405..865f145 100644 --- a/lib/mix/elasticsearch.build.ex +++ b/lib/mix/elasticsearch.build.ex @@ -8,53 +8,55 @@ defmodule Mix.Tasks.Elasticsearch.Build do 4. Remove old indexes beginning with `alias`. 5. Refresh `alias-12323123`. - For a functional version of this approach, see + For a functional version of this approach, see `Elasticsearch.Index.hot_swap/4`. ## Example - $ mix elasticsearch.build posts [index2] [index3] + $ mix elasticsearch.build posts [index2] [index3] --cluster MyApp.Cluster To build an index only if it does not exist, use the `--existing` option: - - $ mix elasticsearch.build posts --existing + + $ mix elasticsearch.build posts --existing --cluster MyApp.Cluster Index posts already exists. """ require Logger alias Elasticsearch.{ - Index, - Config + Cluster.Config, + Index } @doc false def run(args) do Mix.Task.run("app.start", []) - {indexes, type} = parse_args!(args) + {cluster, indexes, type} = parse_args!(args) + config = Config.get(cluster) for alias <- indexes do - config = Config.config_for_index(alias) - build(alias, config, type) + build(config, alias, type) end end - defp build(alias, config, :existing) do - case Index.latest_starting_with(alias) do + defp build(config, alias, :existing) do + case Index.latest_starting_with(config, alias) do {:ok, name} -> IO.puts("Index already exists: #{name}") {:error, :not_found} -> - build(alias, config, :rebuild) + build(config, alias, :rebuild) {:error, exception} -> Mix.raise(exception) end end - defp build(alias, %{settings: settings, store: store, sources: sources}, :rebuild) do - with :ok <- Index.hot_swap(alias, settings, store, sources) do + defp build(config, alias, :rebuild) do + %{settings: settings, store: store, sources: sources} = config.indexes[alias] + + with :ok <- Index.hot_swap(config, alias, settings, store, sources) do :ok else {:error, errors} when is_list(errors) -> @@ -85,35 +87,32 @@ defmodule Mix.Tasks.Elasticsearch.Build do end defp parse_args!(args) do - {options, indexes} = - OptionParser.parse!( - args, - switches: [ - existing: :boolean - ] - ) + {options, indexes} = OptionParser.parse!(args, strict: [cluster: :string, existing: :boolean]) + + cluster = + if options[:cluster] do + :"Elixir.#{options[:cluster]}" + else + Mix.raise(""" + Please specify a cluster: + + --cluster MyApp.ClusterName + """) + end indexes = indexes |> Enum.map(&String.to_atom/1) |> MapSet.new() + |> validate_indexes!(cluster) - type = - cond do - options[:existing] -> - :existing - - true -> - :rebuild - end - - validate_indexes!(indexes) + type = if options[:existing], do: :existing, else: :rebuild - {indexes, type} + {cluster, indexes, type} end - defp validate_indexes!(indexes) do - configured = configured_names() + defp validate_indexes!(indexes, cluster) do + configured = configured_index_names(cluster) cond do MapSet.size(indexes) == 0 -> @@ -131,18 +130,15 @@ defmodule Mix.Tasks.Elasticsearch.Build do """) true -> - :ok + indexes end end - defp configured_names do - config() - |> Keyword.get(:indexes) + defp configured_index_names(cluster) do + cluster + |> Config.get() + |> Map.get(:indexes) |> Enum.map(fn {key, _val} -> key end) |> MapSet.new() end - - defp config do - Application.get_all_env(:elasticsearch) - end end diff --git a/mix.exs b/mix.exs index 078ff53..49a9f0e 100644 --- a/mix.exs +++ b/mix.exs @@ -58,6 +58,8 @@ defmodule Elasticsearch.Mixfile do [ {:poison, ">= 0.0.0", optional: true}, {:httpoison, ">= 0.0.0"}, + {:vex, "~> 0.6.0"}, + {:stream_data, ">= 0.0.0", only: [:dev, :test]}, {:dialyze, ">= 0.0.0", only: [:dev, :test]}, {:ex_doc, ">= 0.0.0", only: [:dev, :test]}, {:excoveralls, ">= 0.0.0", only: :test} @@ -75,7 +77,7 @@ defmodule Elasticsearch.Mixfile do Elasticsearch.API.HTTP ], Config: [ - Elasticsearch.Config + Elasticsearch.Cluster ], Indexing: [ Elasticsearch.Index, @@ -92,4 +94,4 @@ defmodule Elasticsearch.Mixfile do ] ] end -end \ No newline at end of file +end diff --git a/mix.lock b/mix.lock index 5512536..10fc511 100644 --- a/mix.lock +++ b/mix.lock @@ -14,5 +14,7 @@ "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [], [], "hexpm"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [], [], "hexpm"}, + "stream_data": {:hex, :stream_data, "0.4.2", "fa86b78c88ec4eaa482c0891350fcc23f19a79059a687760ddcf8680aac2799b", [:mix], [], "hexpm"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [], [], "hexpm"}, + "vex": {:hex, :vex, "0.6.0", "4e79b396b2ec18cd909eed0450b19108d9631842598d46552dc05031100b7a56", [:mix], [], "hexpm"}, } diff --git a/test/elasticsearch/cluster/cluster_test.exs b/test/elasticsearch/cluster/cluster_test.exs new file mode 100644 index 0000000..6bfbb04 --- /dev/null +++ b/test/elasticsearch/cluster/cluster_test.exs @@ -0,0 +1,139 @@ +defmodule Elasticsearch.ClusterTest do + use ExUnit.Case, async: false + + def valid_config do + %{ + api: Elasticsearch.API.HTTP, + bulk_page_size: 5000, + bulk_wait_interval: 5000, + json_library: Poison, + url: "http://localhost:9200", + username: "username", + password: "password", + indexes: %{ + posts: %{ + settings: "test/support/settings/posts.json", + store: Elasticsearch.Test.Store, + sources: [Post] + } + } + } + end + + setup do + Application.put_env( + :elasticsearch, + Elasticsearch.ClusterTest.MixConfiguredCluster, + valid_config() + ) + end + + defmodule Cluster do + use Elasticsearch.Cluster + end + + defmodule MixConfiguredCluster do + use Elasticsearch.Cluster, otp_app: :elasticsearch + end + + defmodule InitConfiguredCluster do + use Elasticsearch.Cluster + + def init(_config) do + {:ok, Elasticsearch.ClusterTest.valid_config()} + end + end + + describe "configuration" do + test "accepts Mix configuration" do + assert {:ok, _pid} = MixConfiguredCluster.start_link() + assert MixConfiguredCluster.__config__() == valid_config() + end + + test "accepts init configuration" do + assert {:ok, _pid} = InitConfiguredCluster.start_link() + assert InitConfiguredCluster.__config__() == valid_config() + end + + test "accepts configuration on startup" do + assert {:ok, _pid} = Cluster.start_link(valid_config()) + assert Cluster.__config__() == valid_config() + end + end + + describe ".start_link/1" do + test "validates url" do + refute errors_on(url: "http://localhost:9200")[:url] + assert errors_on(url: "werlkjweoqwelj").url + end + + test "validates username" do + assert {"must be present", validation: :presence} in errors_on(%{password: "password"}).username + refute errors_on([])[:username] + end + + test "validates password" do + assert {"must be present", validation: :presence} in errors_on(%{username: "username"}).password + refute errors_on([])[:password] + end + + test "validates api" do + assert {"must be present", validation: :presence} in errors_on([]).api + + for invalid <- [Nonexistent.Module, "string"] do + assert {"must be valid", validation: :by} in errors_on(api: invalid).api + end + end + + test "validates json_library" do + refute errors_on([])[:json_library] + refute errors_on(json_library: Poison)[:json_library] + + assert {"must be valid", validation: :by} in errors_on(json_library: Nonexistent.Module).json_library + end + + test "validates bulk_page_size" do + assert {"must be present", validation: :presence} in errors_on([]).bulk_page_size + + for invalid <- [nil, "string", :atom] do + assert {"must be valid", validation: :by} in errors_on(bulk_page_size: invalid).bulk_page_size + end + end + + test "validates bulk_wait_interval" do + assert {"must be present", validation: :presence} in errors_on([]).bulk_wait_interval + + for invalid <- [nil, "string", :atom] do + assert {"must be valid", validation: :by} in errors_on(bulk_wait_interval: invalid).bulk_wait_interval + end + end + + test "validates indexes" do + errors = errors_on(%{valid_config() | indexes: %{example: %{}}}) + + for field <- [:settings, :store, :sources] do + assert {"must be present", validation: :presence} in errors[field] + end + + errors = + errors_on(%{ + valid_config() + | indexes: %{example: %{settings: :atom, store: Nonexistent.Module, sources: 123}} + }) + + for field <- [:settings, :store, :sources] do + assert {"must be valid", validation: :by} in errors[field] + end + end + + test "accepts valid configuration" do + assert {:ok, pid} = Cluster.start_link(valid_config()) + assert is_pid(pid) + end + end + + defp errors_on(config) do + {:error, errors} = Cluster.start_link(config) + errors + end +end diff --git a/test/elasticsearch/config_test.exs b/test/elasticsearch/config_test.exs deleted file mode 100644 index fa63206..0000000 --- a/test/elasticsearch/config_test.exs +++ /dev/null @@ -1,7 +0,0 @@ -defmodule Elasticsearch.ConfigTest do - use ExUnit.Case - - alias Elasticsearch.Config - - doctest Elasticsearch.Config -end diff --git a/test/elasticsearch/indexing/index_test.exs b/test/elasticsearch/indexing/index_test.exs index 31c63da..c41e5bf 100644 --- a/test/elasticsearch/indexing/index_test.exs +++ b/test/elasticsearch/indexing/index_test.exs @@ -1,13 +1,16 @@ -defmodule Elasticsearch.IndexTest do +defmodule Elasticsearch.Cluster.IndexTest do use ExUnit.Case - alias Elasticsearch.Index + alias Elasticsearch.{ + Index, + Test.Cluster + } doctest Elasticsearch.Index setup do for index <- ["posts"] do - Elasticsearch.delete("/#{index}*") + Elasticsearch.delete(Cluster, "/#{index}*") end end end diff --git a/test/elasticsearch/storage/data_stream_test.exs b/test/elasticsearch/storage/data_stream_test.exs index af53fd6..5256c9a 100644 --- a/test/elasticsearch/storage/data_stream_test.exs +++ b/test/elasticsearch/storage/data_stream_test.exs @@ -1,6 +1,7 @@ defmodule Elasticsearch.DataStreamTest do use ExUnit.Case + alias Elasticsearch.Test.Cluster alias Elasticsearch.DataStream doctest Elasticsearch.DataStream diff --git a/test/elasticsearch_test.exs b/test/elasticsearch_test.exs index 6d315ca..7d1f838 100644 --- a/test/elasticsearch_test.exs +++ b/test/elasticsearch_test.exs @@ -1,16 +1,21 @@ defmodule ElasticsearchTest do use ExUnit.Case + alias Elasticsearch.{ + Index, + Test.Cluster + } + doctest Elasticsearch setup do on_exit(fn -> - "posts" - |> Elasticsearch.Index.starting_with() + Cluster + |> Index.starting_with("posts") |> elem(1) - |> Enum.map(&Elasticsearch.delete!("/#{&1}")) + |> Enum.map(&Elasticsearch.delete!(Cluster, "/#{&1}")) - Elasticsearch.delete("/nonexistent") + Elasticsearch.delete(Cluster, "/nonexistent") end) end end diff --git a/test/mix/elasticsearch.build_test.exs b/test/mix/elasticsearch.build_test.exs index 4e46289..76f26d0 100644 --- a/test/mix/elasticsearch.build_test.exs +++ b/test/mix/elasticsearch.build_test.exs @@ -4,18 +4,20 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do import Mix.Task, only: [rerun: 2] import ExUnit.CaptureIO - alias Elasticsearch alias Elasticsearch.Index + alias Elasticsearch.Test.Cluster, as: TestCluster setup do on_exit(fn -> - "posts" - |> Index.starting_with() + TestCluster + |> Index.starting_with("posts") |> elem(1) - |> Enum.map(&Elasticsearch.delete("/#{&1}")) + |> Enum.map(&Elasticsearch.delete(TestCluster, "/#{&1}")) end) end + @cluster_opts ["--cluster", "Elasticsearch.Test.Cluster"] + describe ".run" do test "raises error on invalid options" do assert_raise Mix.Error, fn -> @@ -23,45 +25,51 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do end end + test "raises error if cluster not specified" do + assert_raise Mix.Error, fn -> + rerun("elasticsearch.build", ["posts"]) + end + end + test "raises error on unconfigured indexes" do assert_raise Mix.Error, fn -> - rerun("elasticsearch.build", ["nonexistent"]) + rerun("elasticsearch.build", ["nonexistent"] ++ @cluster_opts) end end test "raises error if no index specified" do assert_raise Mix.Error, fn -> - rerun("elasticsearch.build", []) + rerun("elasticsearch.build", [] ++ @cluster_opts) end end test "builds configured index" do - rerun("elasticsearch.build", ["posts"]) + rerun("elasticsearch.build", ["posts"] ++ @cluster_opts) - resp = Elasticsearch.get!("/posts/_search") + resp = Elasticsearch.get!(TestCluster, "/posts/_search") assert resp["hits"]["total"] == 10_000 end test "only keeps two index versions" do for _ <- 1..3 do - rerun("elasticsearch.build", ["posts"]) + rerun("elasticsearch.build", ["posts"] ++ @cluster_opts) :timer.sleep(1000) end - {:ok, indexes} = Index.starting_with("posts") + {:ok, indexes} = Index.starting_with(TestCluster, "posts") assert length(indexes) == 2 [_previous, current] = Enum.sort(indexes) # assert that the most recent index is the one that is aliased - assert {:ok, %{^current => _}} = Elasticsearch.get("/posts/_alias") + assert {:ok, %{^current => _}} = Elasticsearch.get(TestCluster, "/posts/_alias") end test "--existing checks if index exists" do - rerun("elasticsearch.build", ["posts"]) + rerun("elasticsearch.build", ["posts"] ++ @cluster_opts) io = capture_io(fn -> - rerun("elasticsearch.build", ["posts", "--existing"]) + rerun("elasticsearch.build", ["posts", "--existing"] ++ @cluster_opts) end) assert io =~ "Index already exists: posts-" diff --git a/test/support/cluster.ex b/test/support/cluster.ex new file mode 100644 index 0000000..0ebeb96 --- /dev/null +++ b/test/support/cluster.ex @@ -0,0 +1,25 @@ +defmodule Elasticsearch.Test.Cluster do + @moduledoc false + + use Elasticsearch.Cluster + + def init(_config) do + {:ok, + %{ + api: Elasticsearch.API.HTTP, + bulk_page_size: 5000, + bulk_wait_interval: 5000, + json_library: Poison, + url: "http://localhost:9200", + username: "username", + password: "password", + indexes: %{ + posts: %{ + settings: "test/support/settings/posts.json", + store: Elasticsearch.Test.Store, + sources: [Post] + } + } + }} + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 808f834..b37ed96 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -8,4 +8,5 @@ unless System.get_env("CI") do ) end -{:ok, _} = Elasticsearch.wait_for_boot(15) +{:ok, _} = Elasticsearch.Test.Cluster.start_link() +{:ok, _} = Elasticsearch.wait_for_boot(Elasticsearch.Test.Cluster, 15) From 029039575907471c3c6ea1e7b15ac5dc3413ba77 Mon Sep 17 00:00:00 2001 From: Daniel Berkompas Date: Fri, 13 Apr 2018 13:15:52 -0700 Subject: [PATCH 2/9] 100% test coverage for Elasticsearch.Executable --- test/elasticsearch/executable_test.exs | 42 ++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 test/elasticsearch/executable_test.exs diff --git a/test/elasticsearch/executable_test.exs b/test/elasticsearch/executable_test.exs new file mode 100644 index 0000000..98a0b8f --- /dev/null +++ b/test/elasticsearch/executable_test.exs @@ -0,0 +1,42 @@ +defmodule Elasticsearch.ExecutableTest do + use ExUnit.Case, async: false + + alias Elasticsearch.Executable + + import ExUnit.CaptureIO + + describe ".start_link/3" do + test "starts the executable if it isn't running" do + output = + capture_io(fn -> + assert {:ok, pid} = + Executable.start_link( + "Elasticsearch", + "vendor/elasticsearch/bin/elasticsearch", + 9201 + ) + + GenServer.stop(pid) + end) + + assert output =~ "[info] Running Elasticsearch with PID" + assert output =~ "on port 9201" + end + + test "does nothing if executable is already running" do + output = + capture_io(fn -> + assert {:ok, pid} = + Executable.start_link( + "Elasticsearch", + "vendor/elasticsearch/bin/elasticsearch", + 9200 + ) + + GenServer.stop(pid) + end) + + assert output =~ "[info] Detected Elasticsearch already running on port 9200" + end + end +end From c6afe9e9de139e7e7cf72f13655d38bd913c002f Mon Sep 17 00:00:00 2001 From: Daniel Berkompas Date: Fri, 13 Apr 2018 13:21:55 -0700 Subject: [PATCH 3/9] Exclude elasticsearch.install from coverage Until we can decide on a good way to test it. --- coveralls.json | 1 + lib/mix/elasticsearch.install.ex | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/coveralls.json b/coveralls.json index 0f97cf1..dc62bd3 100644 --- a/coveralls.json +++ b/coveralls.json @@ -1,6 +1,7 @@ { "skip_files": [ "lib/elasticsearch/api/api.ex", + "lib/mix/elasticsearch.install.ex", "test/support/*" ], "coverage_options": { diff --git a/lib/mix/elasticsearch.install.ex b/lib/mix/elasticsearch.install.ex index 71802d0..b6b34ad 100644 --- a/lib/mix/elasticsearch.install.ex +++ b/lib/mix/elasticsearch.install.ex @@ -15,8 +15,8 @@ defmodule Mix.Tasks.Elasticsearch.Install do @doc false def run(args) do - with {[{:version, version}], [location], _} <- - OptionParser.parse(args, switches: [version: :string]) do + with {[{:version, version}], [location]} <- + OptionParser.parse!(args, strict: [version: :string]) do download_elasticsearch(version, location) download_kibana(version, location) else From 511cbd0e0818e7c7b227961c7f16d42aea5226d0 Mon Sep 17 00:00:00 2001 From: Daniel Berkompas Date: Fri, 13 Apr 2018 13:56:29 -0700 Subject: [PATCH 4/9] Add upgrading doc for 0.1.x to 0.2.x --- README.md | 2 +- guides/upgrading/0.1.x_to_0.2.x.md | 104 +++++++++++++++++++++++++++++ mix.exs | 7 +- 3 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 guides/upgrading/0.1.x_to_0.2.x.md diff --git a/README.md b/README.md index cc5c821..3673a91 100644 --- a/README.md +++ b/README.md @@ -149,7 +149,7 @@ defmodule MyApp.ElasticsearchMock do @behaviour Elasticsearch.API @impl true - def request(_config, :get, "/posts/1", _headers, _opts) do + def request(_config, :get, "/posts/1", _data, _opts) do {:ok, %HTTPoison.Response{ status_code: 404, body: %{ diff --git a/guides/upgrading/0.1.x_to_0.2.x.md b/guides/upgrading/0.1.x_to_0.2.x.md new file mode 100644 index 0000000..424b78e --- /dev/null +++ b/guides/upgrading/0.1.x_to_0.2.x.md @@ -0,0 +1,104 @@ +# Upgrading from 0.1.x to 0.2.x + +Version 0.2.0 brings this library in line with Elixir best practices for +configuration. + +## Rationale + +Configuration is no longer global. Instead, you configure +`Elasticsearch.Cluster` modules, which are `GenServer`s. This has several +benefits: + +1. A single OTP app can talk to multiple Elasticsearch clusters. This is + useful for umbrella apps in particular. + +2. Reading from environment variables is standardized. You do it by + overriding the cluster's `init/1` callback, as in Ecto and Phoenix. + +3. You can start clusters like any other genserver, passing in + configuration at that time. + +4. You can bypass clusters altogether by passing configuration instead + of a cluster to any function that expects a cluster. + +## Changes + +**BREAKING**: an `Elasticsearch.Cluster` module or configuration map is now +required for all `Elasticsearch` function calls. + +**BREAKING**: `{:system, "ENV_VAR"}` is no longer supported in configuration. +Instead, you should read from environment variables in the `init/1` callback +as described in the `Elasticsearch.Cluster` documentation. + +**BREAKING**: The `:api_module` configuration option has been renamed +to `:api`. + +**BREAKING**: The required callbacks for `Elasticsearch.API` have been +simplified down to a single `request/5` function. + +**BREAKING**: `mix elasticsearch.build` now requires the `--cluster` option. + +## How to Update Your App + +First, add a `Cluster` to your application: + + defmodule MyApp.ElasticsearchCluster do + use Elasticsearch.Cluster, otp_app: :my_app + end + +Next, switch over your `:elasticsearch` configuration over to your +application, and include the cluster. Be sure to rename the `:api_module` +option to `:api`. See `Elasticsearch.Cluster` for more options. + + # BEFORE + config :elasticsearch, + url: "http://localhost:9200", + api_module: Elasticsearch.API.HTTP, + # ... + + # AFTER + config, :my_app, MyApp.ElasticsearchCluster, + url: "http://localhost:9200", + api: Elasticsearch.API.HTTP, + # ... + +Next, be sure to start your Cluster in your application supervisor: + + children = [ + MyApp.ElasticsearchCluster + ] + +Next, add your cluster to all calls to `Elasticsearch` functions: + + # BEFORE + Elasticsearch.post("/posts/_search", %{...}) + + # AFTER + Elasticsearch.post(MyApp.ElasticsearchCluster, "/posts/_search", %{...}) + +Next, update any mock implementations of `Elasticsearch.API` to implement the +`request/5` function instead of the previous functions. + + # BEFORE + defmodule MyApp.ElasticsearchMock do + @behaviour Elasticsearch.API + + def get("/url", _opts) do + {:ok, %HTTPoison.Response{}} + end + end + + # AFTER + defmodule MyApp.ElasticsearchMock do + @behaviour Elasticsearch.API + + @impl true + def request(_config, :get, "/url", _data, _opts) do + {:ok, %HTTPoison.Response{}} + end + end + +Finally, update any calls to `mix elasticsearch.build` to include the cluster +as an option: + + mix elasticsearch.build [index] --cluster MyApp.ElasticsearchCluster \ No newline at end of file diff --git a/mix.exs b/mix.exs index 49a9f0e..3389762 100644 --- a/mix.exs +++ b/mix.exs @@ -59,7 +59,6 @@ defmodule Elasticsearch.Mixfile do {:poison, ">= 0.0.0", optional: true}, {:httpoison, ">= 0.0.0"}, {:vex, "~> 0.6.0"}, - {:stream_data, ">= 0.0.0", only: [:dev, :test]}, {:dialyze, ">= 0.0.0", only: [:dev, :test]}, {:ex_doc, ">= 0.0.0", only: [:dev, :test]}, {:excoveralls, ">= 0.0.0", only: :test} @@ -69,7 +68,11 @@ defmodule Elasticsearch.Mixfile do defp docs do [ main: "readme", - extras: ["README.md"], + extras: ["README.md", "guides/upgrading/0.1.x_to_0.2.x.md": [title: "0.1.x to 0.2.x"]], + extra_section: "GUIDES", + groups_for_extras: [ + Upgrading: ~r/upgrading/ + ], groups_for_modules: [ API: [ Elasticsearch, From 0fdcf997d439192554da879afcf44882b4e67384 Mon Sep 17 00:00:00 2001 From: Daniel Berkompas Date: Fri, 13 Apr 2018 14:01:58 -0700 Subject: [PATCH 5/9] Add docs to Elasticsearch.Store --- coveralls.json | 1 + lib/elasticsearch/storage/store.ex | 32 ++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/coveralls.json b/coveralls.json index dc62bd3..7800272 100644 --- a/coveralls.json +++ b/coveralls.json @@ -1,6 +1,7 @@ { "skip_files": [ "lib/elasticsearch/api/api.ex", + "lib/elasticsearch/storage/store.ex", "lib/mix/elasticsearch.install.ex", "test/support/*" ], diff --git a/lib/elasticsearch/storage/store.ex b/lib/elasticsearch/storage/store.ex index fb23eee..55c1d42 100644 --- a/lib/elasticsearch/storage/store.ex +++ b/lib/elasticsearch/storage/store.ex @@ -1,8 +1,40 @@ defmodule Elasticsearch.Store do + @moduledoc """ + A behaviour for fetching data to index. Used by `mix elasticsearch.build`. + """ + + @typedoc """ + A data source. For example, `Post`, where `Post` is an `Ecto.Schema`. + Each datum returned must implement `Elasticsearch.Document`. + """ @type source :: any + + @typedoc """ + Instances of the data source. For example, `%Post{}` structs. + """ @type data :: any + + @typedoc """ + The current offset for the query. + """ @type offset :: integer + + @typedoc """ + A limit on the number of elements to return. + """ @type limit :: integer + @doc """ + Loads data based on the given source, offset, and limit. + + ## Example + + def load(Post, offset, limit) do + Post + |> offset(^offset) + |> limit(^limit) + |> Repo.all() + end + """ @callback load(source, offset, limit) :: [data] end From ea22625ff102c162f7ba31d0f87f25b86de77a37 Mon Sep 17 00:00:00 2001 From: Daniel Berkompas Date: Fri, 13 Apr 2018 14:15:55 -0700 Subject: [PATCH 6/9] Remove dialyzer --- bin/test | 4 ---- mix.exs | 1 - 2 files changed, 5 deletions(-) diff --git a/bin/test b/bin/test index 1bea825..fa0a723 100755 --- a/bin/test +++ b/bin/test @@ -9,10 +9,6 @@ MIX_ENV=test mix format --check-formatted || { echo 'Please format code using `m MIX_ENV=test mix compile --warnings-as-errors --force || { echo 'Please fix all compiler warnings.'; exit 1; } MIX_ENV=test mix docs || { echo 'Elixir HTML docs were not generated!'; exit 1; } -if [ ! $CI ]; then - MIX_ENV=test mix dialyze || { echo 'Dialyzer checks failed.'; exit 1; } -fi - if [ $CI ]; then mix coveralls.travis || { echo 'Elixir tests failed!'; exit 1; } else diff --git a/mix.exs b/mix.exs index 3389762..3734079 100644 --- a/mix.exs +++ b/mix.exs @@ -59,7 +59,6 @@ defmodule Elasticsearch.Mixfile do {:poison, ">= 0.0.0", optional: true}, {:httpoison, ">= 0.0.0"}, {:vex, "~> 0.6.0"}, - {:dialyze, ">= 0.0.0", only: [:dev, :test]}, {:ex_doc, ">= 0.0.0", only: [:dev, :test]}, {:excoveralls, ">= 0.0.0", only: :test} ] From f03e1eb8c4f207dae27ce4f8f521a7e740cf97e3 Mon Sep 17 00:00:00 2001 From: Daniel Berkompas Date: Fri, 13 Apr 2018 16:03:48 -0700 Subject: [PATCH 7/9] Test with real repo --- config/config.exs | 13 +++++++ mix.exs | 11 +++++- mix.lock | 6 +++ test/elasticsearch/indexing/index_test.exs | 2 +- test/mix/elasticsearch.build_test.exs | 12 +++++- test/support/data_case.ex | 37 +++++++++++++++++++ .../20180413213400_create_posts.exs | 10 +++++ test/support/post.ex | 16 +++++--- test/support/repo.ex | 15 ++++++++ test/support/store.ex | 20 ++++------ test/test_helper.exs | 1 + 11 files changed, 121 insertions(+), 22 deletions(-) create mode 100644 test/support/data_case.ex create mode 100644 test/support/migrations/20180413213400_create_posts.exs create mode 100644 test/support/repo.ex diff --git a/config/config.exs b/config/config.exs index b84366c..caf10d7 100644 --- a/config/config.exs +++ b/config/config.exs @@ -18,3 +18,16 @@ config :elasticsearch, sources: [Post] } } + +config :elasticsearch, Elasticsearch.Test.Repo, + adapter: Ecto.Adapters.Postgres, + username: "postgres", + password: "postgres", + database: "elasticsearch_test", + hostname: "localhost", + pool: Ecto.Adapters.SQL.Sandbox, + priv: "test/support/" + +config :elasticsearch, ecto_repos: [Elasticsearch.Test.Repo] + +config :logger, level: :warn diff --git a/mix.exs b/mix.exs index 3734079..bddbb63 100644 --- a/mix.exs +++ b/mix.exs @@ -20,7 +20,8 @@ defmodule Elasticsearch.Mixfile do ], docs: docs(), deps: deps(), - package: package() + package: package(), + aliases: aliases() ] end @@ -59,7 +60,9 @@ defmodule Elasticsearch.Mixfile do {:poison, ">= 0.0.0", optional: true}, {:httpoison, ">= 0.0.0"}, {:vex, "~> 0.6.0"}, + {:postgrex, ">= 0.0.0", only: [:dev, :test]}, {:ex_doc, ">= 0.0.0", only: [:dev, :test]}, + {:ecto, ">= 0.0.0", only: [:dev, :test]}, {:excoveralls, ">= 0.0.0", only: :test} ] end @@ -96,4 +99,10 @@ defmodule Elasticsearch.Mixfile do ] ] end + + defp aliases do + [ + test: ["ecto.create --quiet", "ecto.migrate --quiet", "test"] + ] + end end diff --git a/mix.lock b/mix.lock index 10fc511..37f823b 100644 --- a/mix.lock +++ b/mix.lock @@ -1,7 +1,11 @@ %{ "certifi": {:hex, :certifi, "2.0.0", "a0c0e475107135f76b8c1d5bc7efb33cd3815cb3cf3dea7aefdd174dabead064", [], [], "hexpm"}, + "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm"}, + "db_connection": {:hex, :db_connection, "1.1.3", "89b30ca1ef0a3b469b1c779579590688561d586694a3ce8792985d4d7e575a61", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, repo: "hexpm", optional: true]}], "hexpm"}, + "decimal": {:hex, :decimal, "1.5.0", "b0433a36d0e2430e3d50291b1c65f53c37d56f83665b43d79963684865beab68", [:mix], [], "hexpm"}, "dialyze": {:hex, :dialyze, "0.2.1", "9fb71767f96649020d769db7cbd7290059daff23707d6e851e206b1fdfa92f9d", [], [], "hexpm"}, "earmark": {:hex, :earmark, "1.2.3", "206eb2e2ac1a794aa5256f3982de7a76bf4579ff91cb28d0e17ea2c9491e46a4", [], [], "hexpm"}, + "ecto": {:hex, :ecto, "2.2.10", "e7366dc82f48f8dd78fcbf3ab50985ceeb11cb3dc93435147c6e13f2cda0992e", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: true]}, {:decimal, "~> 1.2", [hex: :decimal, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.8.0", [hex: :mariaex, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}, {:poolboy, "~> 1.5", [hex: :poolboy, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.13.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, repo: "hexpm", optional: true]}], "hexpm"}, "ex_doc": {:hex, :ex_doc, "0.18.1", "37c69d2ef62f24928c1f4fdc7c724ea04aecfdf500c4329185f8e3649c915baf", [], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"}, "excoveralls": {:hex, :excoveralls, "0.8.0", "99d2691d3edf8612f128be3f9869c4d44b91c67cec92186ce49470ae7a7404cf", [], [{:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"}, "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"}, @@ -13,6 +17,8 @@ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [], [], "hexpm"}, "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [], [], "hexpm"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, + "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], [], "hexpm"}, + "postgrex": {:hex, :postgrex, "0.13.5", "3d931aba29363e1443da167a4b12f06dcd171103c424de15e5f3fc2ba3e6d9c5", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 1.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [], [], "hexpm"}, "stream_data": {:hex, :stream_data, "0.4.2", "fa86b78c88ec4eaa482c0891350fcc23f19a79059a687760ddcf8680aac2799b", [:mix], [], "hexpm"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [], [], "hexpm"}, diff --git a/test/elasticsearch/indexing/index_test.exs b/test/elasticsearch/indexing/index_test.exs index c41e5bf..5f95288 100644 --- a/test/elasticsearch/indexing/index_test.exs +++ b/test/elasticsearch/indexing/index_test.exs @@ -1,5 +1,5 @@ defmodule Elasticsearch.Cluster.IndexTest do - use ExUnit.Case + use Elasticsearch.DataCase, async: false alias Elasticsearch.{ Index, diff --git a/test/mix/elasticsearch.build_test.exs b/test/mix/elasticsearch.build_test.exs index 76f26d0..2efb99b 100644 --- a/test/mix/elasticsearch.build_test.exs +++ b/test/mix/elasticsearch.build_test.exs @@ -1,5 +1,5 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do - use ExUnit.Case + use Elasticsearch.DataCase, async: false import Mix.Task, only: [rerun: 2] import ExUnit.CaptureIO @@ -18,6 +18,15 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do @cluster_opts ["--cluster", "Elasticsearch.Test.Cluster"] + def populate_posts_table do + posts = + [%{title: "Example Post", author: "John Smith"}] + |> Stream.cycle() + |> Enum.take(10_000) + + Repo.insert_all("posts", posts) + end + describe ".run" do test "raises error on invalid options" do assert_raise Mix.Error, fn -> @@ -44,6 +53,7 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do end test "builds configured index" do + populate_posts_table() rerun("elasticsearch.build", ["posts"] ++ @cluster_opts) resp = Elasticsearch.get!(TestCluster, "/posts/_search") diff --git a/test/support/data_case.ex b/test/support/data_case.ex new file mode 100644 index 0000000..8295034 --- /dev/null +++ b/test/support/data_case.ex @@ -0,0 +1,37 @@ +defmodule Elasticsearch.DataCase do + @moduledoc """ + This module defines the setup for tests requiring + access to the application's data layer. + + You may define functions here to be used as helpers in + your tests. + + Finally, if the test case interacts with the database, + it cannot be async. For this reason, every test runs + inside a transaction which is reset at the beginning + of the test unless the test case is marked as async. + """ + + use ExUnit.CaseTemplate + + using do + quote do + alias Elasticsearch.Test.Repo + + import Ecto + import Ecto.Changeset + import Ecto.Query + import Elasticsearch.DataCase + end + end + + setup tags do + :ok = Ecto.Adapters.SQL.Sandbox.checkout(Elasticsearch.Test.Repo) + + unless tags[:async] do + Ecto.Adapters.SQL.Sandbox.mode(Elasticsearch.Test.Repo, {:shared, self()}) + end + + :ok + end +end diff --git a/test/support/migrations/20180413213400_create_posts.exs b/test/support/migrations/20180413213400_create_posts.exs new file mode 100644 index 0000000..673d5b5 --- /dev/null +++ b/test/support/migrations/20180413213400_create_posts.exs @@ -0,0 +1,10 @@ +defmodule Elasticsearch.Test.Repo.Migrations.CreatePosts do + use Ecto.Migration + + def change do + create table(:posts) do + add(:title, :string) + add(:author, :string) + end + end +end diff --git a/test/support/post.ex b/test/support/post.ex index cdcc4ca..cf6c454 100644 --- a/test/support/post.ex +++ b/test/support/post.ex @@ -1,17 +1,21 @@ defmodule Post do - @moduledoc false - defstruct id: nil, title: nil, author: nil + use Ecto.Schema + + schema "posts" do + field(:title, :string) + field(:author, :string) + end end defimpl Elasticsearch.Document, for: Post do - def id(item), do: item.id + def id(post), do: post.id def type(_item), do: "post" def parent(_item), do: false - def encode(item) do + def encode(post) do %{ - title: item.title, - author: item.author + title: post.title, + author: post.author } end end diff --git a/test/support/repo.ex b/test/support/repo.ex new file mode 100644 index 0000000..0c62518 --- /dev/null +++ b/test/support/repo.ex @@ -0,0 +1,15 @@ +defmodule Elasticsearch.Test.Repo do + use Ecto.Repo, otp_app: :elasticsearch + + @doc """ + Dynamically loads the repository url from the + DATABASE_URL environment variable. + """ + def init(_, opts) do + if url = System.get_env("DATABASE_URL") do + {:ok, Keyword.put(opts, :url, url)} + else + {:ok, opts} + end + end +end diff --git a/test/support/store.ex b/test/support/store.ex index 8646a33..ffbeeb8 100644 --- a/test/support/store.ex +++ b/test/support/store.ex @@ -2,20 +2,14 @@ defmodule Elasticsearch.Test.Store do @moduledoc false @behaviour Elasticsearch.Store - def load(Post, offset, _limit) when offset <= 5_000 do - [%Post{title: "Name", author: "Author"}] - |> Stream.cycle() - |> Stream.map(&Map.put(&1, :id, random_str())) - |> Enum.take(5000) - end + import Ecto.Query - def load(_module, _offset, _limit) do - [] - end + alias Elasticsearch.Test.Repo - defp random_str do - 32 - |> :crypto.strong_rand_bytes() - |> Base.encode64() + def load(Post, offset, limit) do + Post + |> offset(^offset) + |> limit(^limit) + |> Repo.all() end end diff --git a/test/test_helper.exs b/test/test_helper.exs index b37ed96..63e1d76 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,4 +1,5 @@ ExUnit.start() +Elasticsearch.Test.Repo.start_link() unless System.get_env("CI") do Elasticsearch.Executable.start_link( From 1f8b7096dab1638ac1391355187f53e1e72f3398 Mon Sep 17 00:00:00 2001 From: Daniel Berkompas Date: Sat, 14 Apr 2018 10:08:01 -0700 Subject: [PATCH 8/9] Make executable test compatible with Travis --- test/elasticsearch/executable_test.exs | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/test/elasticsearch/executable_test.exs b/test/elasticsearch/executable_test.exs index 98a0b8f..3ef209a 100644 --- a/test/elasticsearch/executable_test.exs +++ b/test/elasticsearch/executable_test.exs @@ -5,17 +5,21 @@ defmodule Elasticsearch.ExecutableTest do import ExUnit.CaptureIO + defp elasticsearch do + case System.cmd("which", ["elasticsearch"]) do + {"", _} -> + "vendor/elasticsearch/bin/elasticsearch" + + {path, 0} -> + path + end + end + describe ".start_link/3" do test "starts the executable if it isn't running" do output = capture_io(fn -> - assert {:ok, pid} = - Executable.start_link( - "Elasticsearch", - "vendor/elasticsearch/bin/elasticsearch", - 9201 - ) - + assert {:ok, pid} = Executable.start_link("Elasticsearch", elasticsearch(), 9201) GenServer.stop(pid) end) @@ -26,13 +30,7 @@ defmodule Elasticsearch.ExecutableTest do test "does nothing if executable is already running" do output = capture_io(fn -> - assert {:ok, pid} = - Executable.start_link( - "Elasticsearch", - "vendor/elasticsearch/bin/elasticsearch", - 9200 - ) - + assert {:ok, pid} = Executable.start_link("Elasticsearch", elasticsearch(), 9200) GenServer.stop(pid) end) From ef9e574c92124652c05fc99be3b4fe5809964934 Mon Sep 17 00:00:00 2001 From: Daniel Berkompas Date: Sat, 14 Apr 2018 10:18:42 -0700 Subject: [PATCH 9/9] Bump version to v0.2.0 --- README.md | 2 +- mix.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 3673a91..88e17d4 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Add `elasticsearch` to your list of dependencies in `mix.exs`: ```elixir def deps do [ - {:elasticsearch, "~> 0.1.1"} + {:elasticsearch, "~> 0.2.0"} ] end ``` diff --git a/mix.exs b/mix.exs index bddbb63..e163a9e 100644 --- a/mix.exs +++ b/mix.exs @@ -6,7 +6,7 @@ defmodule Elasticsearch.Mixfile do app: :elasticsearch, description: "Elasticsearch without DSLs", source_url: "https://github.com/infinitered/elasticsearch-elixir", - version: "0.1.1", + version: "0.2.0", elixir: "~> 1.5", start_permanent: Mix.env() == :prod, elixirc_paths: elixirc_paths(Mix.env()),