Skip to content

Commit

Permalink
ElasticSearch: Replace RestClient with Akka HTTP client (#2347)
Browse files Browse the repository at this point in the history
  • Loading branch information
Marc Rooding committed Nov 13, 2020
1 parent 4fdf171 commit caa01ba
Show file tree
Hide file tree
Showing 31 changed files with 2,844 additions and 1,698 deletions.
6 changes: 5 additions & 1 deletion build.sbt
Expand Up @@ -134,7 +134,11 @@ lazy val csvBench = internalProject("csv-bench")

lazy val dynamodb = alpakkaProject("dynamodb", "aws.dynamodb", Dependencies.DynamoDB)

lazy val elasticsearch = alpakkaProject("elasticsearch", "elasticsearch", Dependencies.Elasticsearch)
lazy val elasticsearch = alpakkaProject(
"elasticsearch",
"elasticsearch",
Dependencies.Elasticsearch
)

// The name 'file' is taken by `sbt.file`, hence 'files'
lazy val files = alpakkaProject("file", "file", Dependencies.File, fatalWarnings := true)
Expand Down
98 changes: 60 additions & 38 deletions docs/src/main/paradox/elasticsearch.md
Expand Up @@ -24,28 +24,46 @@ The table below shows direct dependencies of this module and the second tab show
@@dependencies { projectId="elasticsearch" }


## Set up REST client
### Elasticsearch connection

Sources, Flows and Sinks provided by this connector need a prepared `org.elasticsearch.client.RestClient` to
access to Elasticsearch.
The connection and credentials to authenticate with are configured with `ElasticsearchConnectionSettings`.

Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala) { #init-client }
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #connection-settings }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #init-client }
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchParameterizedTest.java) { #connection-settings }


| Parameter | Default | Description |
| --------------------| ------- | ------------------------------------------------------------------- |
| baseUrl | Empty | The base URL of Elasticsearch. Should not include a trailing slash. |
| username | None | The username to authenticate with |
| password | None | The password to authenticate with |

## Elasticsearch parameters

Any API method that allows reading from and writing to Elasticsearch takes an instance of @apidoc[ElasticsearchParams$].

`ElasticsearchParams` has be constructed based on the ElasticSearch API version that you're targeting:

Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #es-params }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchParameterizedTest.java) { #es-params }

## Elasticsearch as Source and Sink

Now we can stream messages from or to Elasticsearch by providing the `RestClient` to the
You can stream messages from or to Elasticsearch using the
@apidoc[ElasticsearchSource$], @apidoc[ElasticsearchFlow$] or the @apidoc[ElasticsearchSink$].


Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #define-class }
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala) { #define-class }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #define-class }
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTestBase.java) { #define-class }

### With typed source

Expand All @@ -54,20 +72,20 @@ Use `ElasticsearchSource.typed` and `ElasticsearchSink.create` to create source
@java[The data is converted to and from JSON by Jackson's ObjectMapper.]

Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #run-typed }
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #run-typed }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #run-typed }
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #run-typed }

### With JSON source

Use `ElasticsearchSource.create` and `ElasticsearchSink.create` to create source and sink.

Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #run-jsobject }
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #run-jsobject }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #run-jsobject }
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #run-jsobject }


### Writing to Elasticsearch
Expand All @@ -83,10 +101,10 @@ In the above examples, `WriteMessage` is used as the input to `ElasticsearchSink
| WriteMessage.createDeleteMessage | Delete an existing document. If there is no document with the specified `id`, do nothing. |

Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #multiple-operations }
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #multiple-operations }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #multiple-operations }
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #multiple-operations }

### Source configuration

Expand All @@ -96,15 +114,15 @@ Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #source-settings }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #source-settings }


| Parameter | Default | Description |
| ---------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------ |
| bufferSize | 10 | `ElasticsearchSource` retrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size. |
| includeDocumentVersion | false | Tell Elasticsearch to return the documents `_version` property with the search results. See [Version](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-version.html) and [Optimistic Concurrenct Control](https://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html) to know about this property. |
| scrollDuration | 5 min | `ElasticsearchSource` retrieves messages from Elasticsearch by scroll scan. This parameter is used as a scroll value. See [Time units](https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units) for supported units. |
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchParameterizedTest.java) { #source-settings }

| Parameter | Default | Description |
| ---------------------- | -------------- | ------------------------------------------------------------------------------------------------------------------------ |
| connection | | The connection details and credentials to authenticate against ElasticSearch. See `ElasticsearchConnectionSettings` |
| bufferSize | 10 | `ElasticsearchSource` retrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size. |
| includeDocumentVersion | false | Tell Elasticsearch to return the documents `_version` property with the search results. See [Version](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-version.html) and [Optimistic Concurrenct Control](https://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html) to know about this property. |
| scrollDuration | 5 min | `ElasticsearchSource` retrieves messages from Elasticsearch by scroll scan. This parameter is used as a scroll value. See [Time units](https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units) for supported units. |
| apiVersion | V5 | Currently supports `V5` and `V7` (see below) |

### Sink and flow configuration

Expand All @@ -114,11 +132,11 @@ Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #sink-settings }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #sink-settings }

: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchParameterizedTest.java) { #sink-settings }

| Parameter | Default | Description |
| ------------------- | ------- | ------------------------------------------------------------------------------------------------------ |
| connection | | The connection details and credentials to authenticate against ElasticSearch. See `ElasticsearchConnectionSettings` |
| bufferSize | 10 | Flow and Sink batch messages to bulk requests when back-pressure applies. |
| versionType | None | If set, `ElasticsearchSink` uses the chosen versionType to index documents. See [Version types](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types) for accepted settings. |
| retryLogic | No retries | See below |
Expand Down Expand Up @@ -148,8 +166,12 @@ The provided implementations are:
In case of write failures the order of messages downstream is guaranteed to be preserved.

#### Supported API versions
To support writing to multiple versions of Elasticsearch, an `ApiVersion` can be specified.
This will be used to transform the bulk request into a format understood by the corresponding Elasticsearch server.
To support reading and writing to multiple versions of Elasticsearch, an `ApiVersion` can be specified.

This will be used to:
1. transform the bulk request into a format understood by the corresponding Elasticsearch server.
2. determine whether to include the index type mapping in the API calls. See [removal of types](https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html)

Currently [`V5`](https://www.elastic.co/guide/en/elasticsearch/reference/5.6/docs-bulk.html#docs-bulk) and [`V7`](https://www.elastic.co/guide/en/elasticsearch/reference/7.6/docs-bulk.html#docs-bulk) are supported specifically but this parameter does not need to match the server version exactly (for example, either `V5` or `V7` should work with Elasticsearch 6.x).

### Allow explicit index
Expand All @@ -162,21 +184,21 @@ You can also build flow stages with @apidoc[ElasticsearchFlow$].
The API is similar to creating Sinks.

Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #run-flow }
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #run-flow }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #run-flow }
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #run-flow }


### Storing documents from Strings

Elasticsearch requires the documents to be properly formatted JSON. If your data is available as JSON in Strings, you may use the pre-defined `StringMessageWriter` to avoid any conversions. For any other JSON technologies, implement a @scala[`MessageWriter[T]`]@java[`MessageWriter<T>`].

Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #string }
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #string }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #string }
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #string }



Expand All @@ -185,10 +207,10 @@ Java
When streaming documents from Kafka, you might want to commit to Kafka **AFTER** the document has been written to Elastic.

Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #kafka-example }
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #kafka-example }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #kafka-example }
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #kafka-example }


### Specifying custom index-name for every document
Expand All @@ -197,10 +219,10 @@ When working with index-patterns using wildcards, you might need to specify a cu
index-name for each document:

Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #custom-index-name-example }
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #custom-index-name-example }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #custom-index-name-example }
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchParameterizedTest.java) { #custom-index-name-example }


### Specifying custom metadata for every document
Expand All @@ -212,7 +234,7 @@ Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #custom-metadata-example }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #custom-metadata-example }
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchParameterizedTest.java) { #custom-metadata-example }


### More custom searching
Expand All @@ -221,11 +243,11 @@ The easiest way of using Elasticsearch-source, is to just specify the query-para
like specifying which fields to return and so on. In such cases you can instead use 'searchParams' instead:

Scala
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchConnectorBehaviour.scala) { #custom-search-params }
: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchV5Spec.scala) { #custom-search-params }

Java
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java) { #custom-search-params }
: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchV5Test.java) { #custom-search-params }

Additionally, support for [custom routing](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-routing-field.html)
is available through the `routing` key. Add this key and the respective value in 'searchParams' map, to route your search directly to the shard that holds
the document you are looking for and enjoy improved response times.
the document you are looking for and enjoy improved response times.
@@ -0,0 +1 @@
ProblemFilters.exclude[Problem]("akka.stream.alpakka.elasticsearch.*")
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.elasticsearch

final class ElasticsearchConnectionSettings private (
val baseUrl: String,
val username: Option[String],
val password: Option[String]
) {

def withBaseUrl(value: String): ElasticsearchConnectionSettings = copy(baseUrl = value)

def withCredentials(username: String, password: String): ElasticsearchConnectionSettings =
copy(username = Option(username), password = Option(password))

def hasCredentialsDefined: Boolean = username.isDefined && password.isDefined

def copy(baseUrl: String = baseUrl,
username: Option[String] = username,
password: Option[String] = password): ElasticsearchConnectionSettings =
new ElasticsearchConnectionSettings(baseUrl = baseUrl, username = username, password = password)

override def toString =
s"""ElasticsearchConnectionSettings(baseUrl=$baseUrl,username=$username,password=${password.fold("")(_ => "***")})"""
}

object ElasticsearchConnectionSettings {

/** Scala API */
def apply(baseUrl: String): ElasticsearchConnectionSettings = new ElasticsearchConnectionSettings(baseUrl, None, None)

/** Java API */
def create(baseUrl: String): ElasticsearchConnectionSettings =
new ElasticsearchConnectionSettings(baseUrl, None, None)
}
@@ -0,0 +1,25 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.elasticsearch

final class ElasticsearchParams private (val indexName: String, val typeName: Option[String]) {
override def toString =
s"""ElasticsearchParams(indexName=$indexName,typeName=$typeName)"""
}

object ElasticsearchParams {
def V7(indexName: String): ElasticsearchParams = {
require(indexName != null, "You must define an index name")

new ElasticsearchParams(indexName, None)
}

def V5(indexName: String, typeName: String): ElasticsearchParams = {
require(indexName != null, "You must define an index name")
require(typeName != null && typeName.trim.nonEmpty, "You must define a type name for ElasticSearch API version V5")

new ElasticsearchParams(indexName, Some(typeName))
}
}

0 comments on commit caa01ba

Please sign in to comment.