diff --git a/.github/workflows/check-build-test.yml b/.github/workflows/check-build-test.yml index 202b7222aa..42a1f76de0 100644 --- a/.github/workflows/check-build-test.yml +++ b/.github/workflows/check-build-test.yml @@ -86,7 +86,7 @@ jobs: - { connector: couchbase, pre_cmd: 'docker-compose up -d couchbase_prep' } - { connector: csv } - { connector: dynamodb, pre_cmd: 'docker-compose up -d dynamodb' } - - { connector: elasticsearch, pre_cmd: 'docker-compose up -d elasticsearch6 elasticsearch7' } + - { connector: elasticsearch, pre_cmd: 'docker-compose up -d elasticsearch6 elasticsearch7 opensearch1' } - { connector: file } - { connector: ftp, pre_cmd: './scripts/ftp-servers.sh' } - { connector: geode, pre_cmd: 'docker-compose up -d geode' } diff --git a/docker-compose.yml b/docker-compose.yml index aa60a10e2e..72f1680a07 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -85,6 +85,13 @@ services: - "9202:9200" environment: - "discovery.type=single-node" + opensearch1: + image: opensearchproject/opensearch:1.3.1 + ports: + - "9203:9200" + environment: + - "discovery.type=single-node" + - "DISABLE_SECURITY_PLUGIN=true" ftp: image: stilliard/pure-ftpd:latest ports: diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md index c8ab9e6ff5..5991632cbc 100644 --- a/docs/src/main/paradox/index.md +++ b/docs/src/main/paradox/index.md @@ -53,6 +53,7 @@ The [Alpakka project](https://doc.akka.io/docs/alpakka/current/) is an open sour * [MongoDB](mongodb.md) * [MQTT](mqtt.md) * [MQTT Streaming](mqtt-streaming.md) +* [Opensearch](opensearch.md) * [OrientDB](orientdb.md) * [Pulsar](external/pulsar.md) * [Pravega](pravega.md) diff --git a/docs/src/main/paradox/opensearch.md b/docs/src/main/paradox/opensearch.md new file mode 100644 index 0000000000..3a042ea90c --- /dev/null +++ b/docs/src/main/paradox/opensearch.md @@ -0,0 +1,259 @@ +# Opensearch + +The Alpakka Elasticsearch connector also provides Akka Streams integration for Opensearch. + +For more information about Opensearch, please visit the [Opensearch documentation](https://opensearch.org/docs/latest). + +@@project-info{ projectId="elasticsearch" } + +## Artifacts + +@@dependency [sbt,Maven,Gradle] { + group=com.lightbend.akka + artifact=akka-stream-alpakka-elasticsearch_$scala.binary.version$ + version=$project.version$ + symbol2=AkkaVersion + value2=$akka.version$ + group2=com.typesafe.akka + artifact2=akka-stream_$scala.binary.version$ + version2=AkkaVersion +} + +The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively. + +@@dependencies { projectId="elasticsearch" } + + +### Opensearch connection + +The connection and credentials to authenticate with are configured with `OpensearchConnectionSettings`. + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala) { #connection-settings } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchParameterizedTest.java) { #connection-settings } + + +| Parameter | Default | Description | +| --------------------| ------- | ------------------------------------------------------------------- | +| baseUrl | Empty | The base URL of Opensearch. Should not include a trailing slash. | +| username | None | The username to authenticate with | +| password | None | The password to authenticate with | +| headers | None | List of headers that should be sent with the http request. | +| connectionContext | None | The connectionContext that will be used with the http request. This can be used for TLS Auth instead of basic auth (username/password) by setting the SSLContext within the connectionContext. | + +## Opensearch parameters + +Any API method that allows reading from and writing to Opensearch takes an instance of @apidoc[ElasticsearchParams$]. + +`ElasticsearchParams` has be constructed based on the Opensearch API version that you're targeting: + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala) { #opensearch-params } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchParameterizedTest.java) { #opensearch-params } + +## Opensearch as Source and Sink + +You can stream messages from or to Opensearch using the +@apidoc[ElasticsearchSource$], @apidoc[ElasticsearchFlow$] or the @apidoc[ElasticsearchSink$]. + + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala) { #define-class } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTestBase.java) { #define-class } + +### With typed source + +Use `ElasticsearchSource.typed` and `ElasticsearchSink.create` to create source and sink. +@scala[The data is converted to and from JSON by Spray JSON.] +@java[The data is converted to and from JSON by Jackson's ObjectMapper.] + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala) { #run-typed } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchV1Test.java) { #run-typed } + +### With JSON source + +Use `ElasticsearchSource.create` and `ElasticsearchSink.create` to create source and sink. + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala) { #run-jsobject } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchV1Test.java) { #run-jsobject } + + +### Writing to Opensearch + +In the above examples, `WriteMessage` is used as the input to `ElasticsearchSink` and `ElasticsearchFlow`. This means requesting `index` operation to Opensearch. It's possible to request other operations using following message types: + +| Message factory | Description | +| ---------------------- | ---------------------------------------------------------------------------------------------------- | +| WriteMessage.createIndexMessage | Create a new document. If `id` is specified and it already exists, replace the document and increment its version. | +| WriteMessage.createCreateMessage | Create a new document. If `id` already exists, the `WriteResult` will contain an error. | +| WriteMessage.createUpdateMessage | Update an existing document. If there is no document with the specified `id`, do nothing. | +| WriteMessage.createUpsertMessage | Update an existing document. If there is no document with the specified `id`, create a new document. | +| WriteMessage.createDeleteMessage | Delete an existing document. If there is no document with the specified `id`, do nothing. | + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala) { #multiple-operations } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchV1Test.java) { #multiple-operations } + +### Source configuration + +We can configure the source by `OpensearchSourceSettings`. + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala) { #source-settings } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchParameterizedTest.java) { #source-settings } + +| Parameter | Default | Description | +| ---------------------- | -------------- | ------------------------------------------------------------------------------------------------------------------------ | +| connection | | The connection details and credentials to authenticate against Opensearch. See `OpensearchConnectionSettings` | +| bufferSize | 10 | `ElasticsearchSource` retrieves messages from Opensearch by scroll scan. This buffer size is used as the scroll size. | +| includeDocumentVersion | false | Tell Opensearch to return the documents `_version` property with the search results. See [Version](https://www.elastic.co/guide/en/elasticsearch/reference/7.10/search-request-body.html#request-body-search-version) and [Optimistic Concurrency Control](https://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html) to know about this property. | +| scrollDuration | 5 min | `ElasticsearchSource` retrieves messages from Opensearch by scroll scan. This parameter is used as a scroll value. See [Time units](https://www.elastic.co/guide/en/elasticsearch/reference/7.10/common-options.html#time-units) for supported units. | +| apiVersion | V1 | Currently supports `V1` (see below) | + +### Sink and flow configuration + +Sinks and flows are configured with `OpensearchWriteSettings`. + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala) { #sink-settings } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchParameterizedTest.java) { #sink-settings } + +| Parameter | Default | Description | +| ------------------- | ------- | ------------------------------------------------------------------------------------------------------ | +| connection | | The connection details and credentials to authenticate against Opensearch. See `OpensearchConnectionSettings` | +| bufferSize | 10 | Flow and Sink batch messages to bulk requests when back-pressure applies. | +| versionType | None | If set, `ElasticsearchSink` uses the chosen versionType to index documents. See [Version types](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types) for accepted settings. | +| retryLogic | No retries | See below | +| apiVersion | V1 | Currently supports `V1` (see below) | +| allowExplicitIndex | True | When set to False, the index name will be included in the URL instead of on each document (see below) | + +#### Retry logic +A bulk request might fail partially for some reason. To retry failed writes to Opensearch, a `RetryLogic` can be specified. + +The provided implementations are: + +* `RetryAtFixedRate` + +| Parameter | Description | +|---------------------|-----------------| +| maxRetries | The stage fails, if it gets this number of consecutive failures. | +| retryInterval | Failing writes are retried after this duration. | + +* `RetryWithBackoff` + +| Parameter | Description | +|---------------------|-----------------| +| maxRetries | The stage fails, if it gets this number of consecutive failures. | +| minBackoff | Initial backoff for failing writes. | +| maxBackoff | Maximum backoff for failing writes. | + +In case of write failures the order of messages downstream is guaranteed to be preserved. + +#### Supported API versions +To support reading and writing to multiple versions of Opensearch, an `OpensearchApiVersion` can be specified. + +This will be used to: +1. transform the bulk request into a format understood by the corresponding Opensearch server. +2. determine whether to include the index type mapping in the API calls. See [removal of types](https://www.elastic.co/guide/en/elasticsearch/reference/current/removal-of-types.html) + +Currently [`V1`](https://opensearch.org/docs/1.3/opensearch/rest-api/document-apis/bulk/) is supported specifically but this parameter does not need to match the server version exactly (for example, `V1` should also work with Opensearch 2.x). + +### Allow explicit index + +When using the `_bulk` API, Opensearch will reject requests that have an explicit index in the request body if explicit index names are not allowed. See [URL-based access control](https://www.elastic.co/guide/en/elasticsearch/reference/7.10/url-access-control.html) + +## Opensearch as Flow + +You can also build flow stages with @apidoc[ElasticsearchFlow$]. +The API is similar to creating Sinks. + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala) { #run-flow } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchV1Test.java) { #run-flow } + + +### Storing documents from Strings + +Opensearch requires the documents to be properly formatted JSON. If your data is available as JSON in Strings, you may use the pre-defined `StringMessageWriter` to avoid any conversions. For any other JSON technologies, implement a @scala[`MessageWriter[T]`]@java[`MessageWriter`]. + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala) { #string } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchV1Test.java) { #string } + + + +### Passing data through ElasticsearchFlow + +When streaming documents from Kafka, you might want to commit to Kafka **AFTER** the document has been written to Opensearch. + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala) { #kafka-example } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchV1Test.java) { #kafka-example } + + +### Specifying custom index-name for every document + +When working with index-patterns using wildcards, you might need to specify a custom +index-name for each document: + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala) { #custom-index-name-example } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchParameterizedTest.java) { #custom-index-name-example } + + +### Specifying custom metadata for every document + +In some cases you might want to specify custom metadata per document you are inserting, for example a `pipeline`, +this can be done like so: + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala) { #custom-metadata-example } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchParameterizedTest.java) { #custom-metadata-example } + + +### More custom searching + +The easiest way of using Opensearch-source, is to just specify the query-param. Sometimes you need more control, +like specifying which fields to return and so on. In such cases you can instead use 'searchParams' instead: + +Scala +: @@snip [snip](/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala) { #custom-search-params } + +Java +: @@snip [snip](/elasticsearch/src/test/java/docs/javadsl/OpensearchV1Test.java) { #custom-search-params } + + +#### Routing +Support for [custom routing](https://www.elastic.co/guide/en/elasticsearch/reference/7.10/mapping-routing-field.html) +is available through the `routing` key. Add this key and the respective value in 'searchParams' map, to route your search directly to the shard that holds the document you are looking for and enjoy improved response times. + +#### Sort +Support for sort is available through the `sort` key in `searchParams` map. If no sort is given, the source will use `sort=_doc` to maximize performance, as indicated by [Opensearch documentation](https://opensearch.org/docs/latest/opensearch/rest-api/scroll/). diff --git a/elasticsearch/src/main/java/akka/stream/alpakka/elasticsearch/ApiVersion.java b/elasticsearch/src/main/java/akka/stream/alpakka/elasticsearch/ApiVersion.java index a797b1a3d7..9064a8aa0e 100644 --- a/elasticsearch/src/main/java/akka/stream/alpakka/elasticsearch/ApiVersion.java +++ b/elasticsearch/src/main/java/akka/stream/alpakka/elasticsearch/ApiVersion.java @@ -4,7 +4,7 @@ package akka.stream.alpakka.elasticsearch; -public enum ApiVersion { +public enum ApiVersion implements akka.stream.alpakka.elasticsearch.ApiVersionBase { V5, V7 } diff --git a/elasticsearch/src/main/java/akka/stream/alpakka/elasticsearch/ApiVersionBase.java b/elasticsearch/src/main/java/akka/stream/alpakka/elasticsearch/ApiVersionBase.java new file mode 100644 index 0000000000..13bd1d0daf --- /dev/null +++ b/elasticsearch/src/main/java/akka/stream/alpakka/elasticsearch/ApiVersionBase.java @@ -0,0 +1,8 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package akka.stream.alpakka.elasticsearch; + +/** Common interface to represent Opensearch / Elasticsearch versions. */ +public interface ApiVersionBase {} diff --git a/elasticsearch/src/main/java/akka/stream/alpakka/elasticsearch/OpensearchApiVersion.java b/elasticsearch/src/main/java/akka/stream/alpakka/elasticsearch/OpensearchApiVersion.java new file mode 100644 index 0000000000..4ea7042500 --- /dev/null +++ b/elasticsearch/src/main/java/akka/stream/alpakka/elasticsearch/OpensearchApiVersion.java @@ -0,0 +1,9 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package akka.stream.alpakka.elasticsearch; + +public enum OpensearchApiVersion implements ApiVersionBase { + V1 +} diff --git a/elasticsearch/src/main/mima-filters/4.0.0-M1.backwards.excludes/PR2850.backwards.excludes b/elasticsearch/src/main/mima-filters/4.0.0-M1.backwards.excludes/PR2850.backwards.excludes new file mode 100644 index 0000000000..97ad9c397d --- /dev/null +++ b/elasticsearch/src/main/mima-filters/4.0.0-M1.backwards.excludes/PR2850.backwards.excludes @@ -0,0 +1,11 @@ +# New hierarchy to accommodate Opensearch +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.elasticsearch.ElasticsearchSourceSettings.withApiVersion") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.alpakka.elasticsearch.ElasticsearchSourceSettings.*") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings.withApiVersion") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.alpakka.elasticsearch.ElasticsearchWriteSettings.*") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchFlow.*") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSource.*") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSink.create") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchFlow.*") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSource.*") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink.create") diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSourceSettings.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSourceSettings.scala index 650d78be4a..0215fa2fe0 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSourceSettings.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSourceSettings.scala @@ -6,64 +6,34 @@ package akka.stream.alpakka.elasticsearch import java.util.concurrent.TimeUnit -import akka.util.JavaDurationConverters._ - import scala.concurrent.duration.FiniteDuration /** * Configure Elastiscsearch sources. * */ -final class ElasticsearchSourceSettings private (val connection: ElasticsearchConnectionSettings, - val bufferSize: Int, - val includeDocumentVersion: Boolean, - val scrollDuration: FiniteDuration, - val apiVersion: ApiVersion) { - - def withConnection(value: ElasticsearchConnectionSettings): ElasticsearchSourceSettings = copy(connection = value) - - def withBufferSize(value: Int): ElasticsearchSourceSettings = copy(bufferSize = value) - - def withScrollDuration(value: FiniteDuration): ElasticsearchSourceSettings = copy(scrollDuration = value) - - def withScrollDuration(value: java.time.Duration): ElasticsearchSourceSettings = copy(scrollDuration = value.asScala) - - /** - * If includeDocumentVersion is true, '_version' is returned with the search-results - * * http://nocf-www.elastic.co/guide/en/elasticsearch/reference/current/search-request-version.html - * * https://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html - */ - def withIncludeDocumentVersion(value: Boolean): ElasticsearchSourceSettings = - if (includeDocumentVersion == value) this else copy(includeDocumentVersion = value) - - def withApiVersion(value: ApiVersion): ElasticsearchSourceSettings = - if (apiVersion == value) this else copy(apiVersion = value) - - private def copy(connection: ElasticsearchConnectionSettings = connection, - bufferSize: Int = bufferSize, - includeDocumentVersion: Boolean = includeDocumentVersion, - scrollDuration: FiniteDuration = scrollDuration, - apiVersion: ApiVersion = apiVersion): ElasticsearchSourceSettings = +final class ElasticsearchSourceSettings private (connection: ElasticsearchConnectionSettings, + bufferSize: Int, + includeDocumentVersion: Boolean, + scrollDuration: FiniteDuration, + apiVersion: ApiVersion) + extends SourceSettingsBase[ApiVersion, ElasticsearchSourceSettings](connection, + bufferSize, + includeDocumentVersion, + scrollDuration, + apiVersion) { + + protected override def copy(connection: ElasticsearchConnectionSettings, + bufferSize: Int, + includeDocumentVersion: Boolean, + scrollDuration: FiniteDuration, + apiVersion: ApiVersion): ElasticsearchSourceSettings = new ElasticsearchSourceSettings(connection = connection, bufferSize = bufferSize, includeDocumentVersion = includeDocumentVersion, scrollDuration = scrollDuration, apiVersion = apiVersion) - def scroll: String = { - val scrollString = scrollDuration.unit match { - case TimeUnit.DAYS => "d" - case TimeUnit.HOURS => "h" - case TimeUnit.MINUTES => "m" - case TimeUnit.SECONDS => "s" - case TimeUnit.MILLISECONDS => "ms" - case TimeUnit.MICROSECONDS => "micros" - case TimeUnit.NANOSECONDS => "nanos" - } - - s"${scrollDuration.length}$scrollString" - } - override def toString = s"""ElasticsearchSourceSettings(connection=$connection,bufferSize=$bufferSize,includeDocumentVersion=$includeDocumentVersion,scrollDuration=$scrollDuration,apiVersion=$apiVersion)""" diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchWriteSettings.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchWriteSettings.scala index 411c0c722e..07979653bf 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchWriteSettings.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchWriteSettings.scala @@ -59,33 +59,25 @@ object RetryWithBackoff { /** * Configure Elasticsearch sinks and flows. */ -final class ElasticsearchWriteSettings private (val connection: ElasticsearchConnectionSettings, - val bufferSize: Int, - val retryLogic: RetryLogic, - val versionType: Option[String], - val apiVersion: ApiVersion, - val allowExplicitIndex: Boolean) { - - def withConnection(value: ElasticsearchConnectionSettings): ElasticsearchWriteSettings = copy(connection = value) - - def withBufferSize(value: Int): ElasticsearchWriteSettings = copy(bufferSize = value) - - def withRetryLogic(value: RetryLogic): ElasticsearchWriteSettings = - copy(retryLogic = value) - - def withVersionType(value: String): ElasticsearchWriteSettings = copy(versionType = Option(value)) - - def withApiVersion(value: ApiVersion): ElasticsearchWriteSettings = - if (apiVersion == value) this else copy(apiVersion = value) - - def withAllowExplicitIndex(value: Boolean): ElasticsearchWriteSettings = copy(allowExplicitIndex = value) - - private def copy(connection: ElasticsearchConnectionSettings = connection, - bufferSize: Int = bufferSize, - retryLogic: RetryLogic = retryLogic, - versionType: Option[String] = versionType, - apiVersion: ApiVersion = apiVersion, - allowExplicitIndex: Boolean = allowExplicitIndex): ElasticsearchWriteSettings = +final class ElasticsearchWriteSettings private (connection: ElasticsearchConnectionSettings, + bufferSize: Int, + retryLogic: RetryLogic, + versionType: Option[String], + apiVersion: ApiVersion, + allowExplicitIndex: Boolean) + extends WriteSettingsBase[ApiVersion, ElasticsearchWriteSettings](connection, + bufferSize, + retryLogic, + versionType, + apiVersion, + allowExplicitIndex) { + + protected override def copy(connection: ElasticsearchConnectionSettings, + bufferSize: Int, + retryLogic: RetryLogic, + versionType: Option[String], + apiVersion: ApiVersion, + allowExplicitIndex: Boolean): ElasticsearchWriteSettings = new ElasticsearchWriteSettings(connection, bufferSize, retryLogic, versionType, apiVersion, allowExplicitIndex) override def toString: String = diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/OpensearchConnectionSettings.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/OpensearchConnectionSettings.scala new file mode 100644 index 0000000000..90a4f42974 --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/OpensearchConnectionSettings.scala @@ -0,0 +1,20 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package akka.stream.alpakka.elasticsearch + +/** + * Opensearch 1.x is fully compatible with Elasticsearch 7.x with respect to + * connection parameters. + */ +object OpensearchConnectionSettings { + + /** Scala API */ + def apply(baseUrl: String): ElasticsearchConnectionSettings = + ElasticsearchConnectionSettings.apply(baseUrl) + + /** Java API */ + def create(baseUrl: String): ElasticsearchConnectionSettings = + ElasticsearchConnectionSettings.create(baseUrl) +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/OpensearchParams.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/OpensearchParams.scala new file mode 100644 index 0000000000..2de330ddb6 --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/OpensearchParams.scala @@ -0,0 +1,13 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package akka.stream.alpakka.elasticsearch + +/** + * Opensearch 1.x is fully compatible with Elasticsearch 7.x release line, so we could + * reuse the Elasticsearch V7 compatibile implementation. + */ +object OpensearchParams { + def V1(indexName: String): ElasticsearchParams = ElasticsearchParams.V7(indexName) +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/OpensearchSourceSettings.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/OpensearchSourceSettings.scala new file mode 100644 index 0000000000..72353e2897 --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/OpensearchSourceSettings.scala @@ -0,0 +1,57 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package akka.stream.alpakka.elasticsearch + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration + +/** + * Configure Opensearch sources. + * + */ +final class OpensearchSourceSettings private (connection: ElasticsearchConnectionSettings, + bufferSize: Int, + includeDocumentVersion: Boolean, + scrollDuration: FiniteDuration, + apiVersion: OpensearchApiVersion) + extends SourceSettingsBase[OpensearchApiVersion, OpensearchSourceSettings](connection, + bufferSize, + includeDocumentVersion, + scrollDuration, + apiVersion) { + protected override def copy(connection: ElasticsearchConnectionSettings, + bufferSize: Int, + includeDocumentVersion: Boolean, + scrollDuration: FiniteDuration, + apiVersion: OpensearchApiVersion): OpensearchSourceSettings = + new OpensearchSourceSettings(connection = connection, + bufferSize = bufferSize, + includeDocumentVersion = includeDocumentVersion, + scrollDuration = scrollDuration, + apiVersion = apiVersion) + + override def toString = + s"""OpensearchSourceSettings(connection=$connection,bufferSize=$bufferSize,includeDocumentVersion=$includeDocumentVersion,scrollDuration=$scrollDuration,apiVersion=$apiVersion)""" + +} + +object OpensearchSourceSettings { + + /** Scala API */ + def apply(connection: ElasticsearchConnectionSettings): OpensearchSourceSettings = + new OpensearchSourceSettings(connection, + 10, + includeDocumentVersion = false, + FiniteDuration(5, TimeUnit.MINUTES), + OpensearchApiVersion.V1) + + /** Java API */ + def create(connection: ElasticsearchConnectionSettings): OpensearchSourceSettings = + new OpensearchSourceSettings(connection, + 10, + includeDocumentVersion = false, + FiniteDuration(5, TimeUnit.MINUTES), + OpensearchApiVersion.V1) +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/OpensearchWriteSettings.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/OpensearchWriteSettings.scala new file mode 100644 index 0000000000..54b6d4feba --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/OpensearchWriteSettings.scala @@ -0,0 +1,51 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package akka.stream.alpakka.elasticsearch + +/** + * Configure Opensearch sinks and flows. + */ +final class OpensearchWriteSettings private (connection: ElasticsearchConnectionSettings, + bufferSize: Int, + retryLogic: RetryLogic, + versionType: Option[String], + apiVersion: OpensearchApiVersion, + allowExplicitIndex: Boolean) + extends WriteSettingsBase[OpensearchApiVersion, OpensearchWriteSettings](connection, + bufferSize, + retryLogic, + versionType, + apiVersion, + allowExplicitIndex) { + + protected override def copy(connection: ElasticsearchConnectionSettings, + bufferSize: Int, + retryLogic: RetryLogic, + versionType: Option[String], + apiVersion: OpensearchApiVersion, + allowExplicitIndex: Boolean): OpensearchWriteSettings = + new OpensearchWriteSettings(connection, bufferSize, retryLogic, versionType, apiVersion, allowExplicitIndex) + + override def toString: String = + "OpensearchWriteSettings(" + + s"connection=$connection," + + s"bufferSize=$bufferSize," + + s"retryLogic=$retryLogic," + + s"versionType=$versionType," + + s"apiVersion=$apiVersion," + + s"allowExplicitIndex=$allowExplicitIndex)" + +} + +object OpensearchWriteSettings { + + /** Scala API */ + def apply(connection: ElasticsearchConnectionSettings): OpensearchWriteSettings = + new OpensearchWriteSettings(connection, 10, RetryNever, None, OpensearchApiVersion.V1, allowExplicitIndex = true) + + /** Java API */ + def create(connection: ElasticsearchConnectionSettings): OpensearchWriteSettings = + new OpensearchWriteSettings(connection, 10, RetryNever, None, OpensearchApiVersion.V1, allowExplicitIndex = true) +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/SourceSettingsBase.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/SourceSettingsBase.scala new file mode 100644 index 0000000000..123eafdf2e --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/SourceSettingsBase.scala @@ -0,0 +1,64 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package akka.stream.alpakka.elasticsearch + +import akka.util.JavaDurationConverters._ +import java.util.concurrent.TimeUnit + +import akka.stream.alpakka.elasticsearch.ElasticsearchConnectionSettings + +import scala.concurrent.duration.FiniteDuration + +/** + * Configure Elastiscsearch/OpenSearch sources. + * + */ +abstract class SourceSettingsBase[Version <: ApiVersionBase, S <: SourceSettingsBase[Version, S]] private[alpakka] ( + val connection: ElasticsearchConnectionSettings, + val bufferSize: Int, + val includeDocumentVersion: Boolean, + val scrollDuration: FiniteDuration, + val apiVersion: Version +) { this: S => + def withConnection(value: ElasticsearchConnectionSettings): S = copy(connection = value) + + def withBufferSize(value: Int): S = copy(bufferSize = value) + + def withScrollDuration(value: FiniteDuration): S = copy(scrollDuration = value) + + def withScrollDuration(value: java.time.Duration): S = copy(scrollDuration = value.asScala) + + /** + * If includeDocumentVersion is true, '_version' is returned with the search-results + * * https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-version.html + * * https://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html + */ + def withIncludeDocumentVersion(value: Boolean): S = + if (includeDocumentVersion == value) this else copy(includeDocumentVersion = value) + + def withApiVersion(value: Version): S = + if (apiVersion == value) this else copy(apiVersion = value) + + def scroll: String = { + val scrollString = scrollDuration.unit match { + case TimeUnit.DAYS => "d" + case TimeUnit.HOURS => "h" + case TimeUnit.MINUTES => "m" + case TimeUnit.SECONDS => "s" + case TimeUnit.MILLISECONDS => "ms" + case TimeUnit.MICROSECONDS => "micros" + case TimeUnit.NANOSECONDS => "nanos" + } + + s"${scrollDuration.length}$scrollString" + } + + protected def copy(connection: ElasticsearchConnectionSettings = connection, + bufferSize: Int = bufferSize, + includeDocumentVersion: Boolean = includeDocumentVersion, + scrollDuration: FiniteDuration = scrollDuration, + apiVersion: Version = apiVersion): S; + +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/WriteSettingsBase.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/WriteSettingsBase.scala new file mode 100644 index 0000000000..ac0d37a43d --- /dev/null +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/WriteSettingsBase.scala @@ -0,0 +1,42 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package akka.stream.alpakka.elasticsearch + +import akka.stream.alpakka.elasticsearch.ElasticsearchConnectionSettings +import akka.stream.alpakka.elasticsearch.RetryLogic + +/** + * Configure Elasticsearch/OpenSearch sinks and flows. + */ +abstract class WriteSettingsBase[Version <: ApiVersionBase, W <: WriteSettingsBase[Version, W]] private[alpakka] ( + val connection: ElasticsearchConnectionSettings, + val bufferSize: Int, + val retryLogic: RetryLogic, + val versionType: Option[String], + val apiVersion: Version, + val allowExplicitIndex: Boolean +) { this: W => + + def withConnection(value: ElasticsearchConnectionSettings): W = copy(connection = value) + + def withBufferSize(value: Int): W = copy(bufferSize = value) + + def withRetryLogic(value: RetryLogic): W = + copy(retryLogic = value) + + def withVersionType(value: String): W = copy(versionType = Option(value)) + + def withApiVersion(value: Version): W = + if (apiVersion == value) this else copy(apiVersion = value) + + def withAllowExplicitIndex(value: Boolean): W = copy(allowExplicitIndex = value) + + protected def copy(connection: ElasticsearchConnectionSettings = connection, + bufferSize: Int = bufferSize, + retryLogic: RetryLogic = retryLogic, + versionType: Option[String] = versionType, + apiVersion: Version = apiVersion, + allowExplicitIndex: Boolean = allowExplicitIndex): W; +} diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala index 368742e5d4..d2d722a722 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala @@ -12,6 +12,7 @@ import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.alpakka.elasticsearch._ import akka.stream.stage._ import akka.stream._ +import akka.stream.alpakka.elasticsearch import scala.collection.immutable import scala.concurrent.{ExecutionContext, Future} @@ -24,7 +25,7 @@ import scala.concurrent.{ExecutionContext, Future} @InternalApi private[elasticsearch] final class ElasticsearchSimpleFlowStage[T, C]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], writer: MessageWriter[T] )(implicit http: HttpExt, mat: Materializer, ec: ExecutionContext) extends GraphStage[ @@ -45,6 +46,10 @@ private[elasticsearch] final class ElasticsearchSimpleFlowStage[T, C]( writer) case ApiVersion.V7 => new RestBulkApiV7[T, C](elasticsearchParams.indexName, settings.versionType, settings.allowExplicitIndex, writer) + + case elasticsearch.OpensearchApiVersion.V1 => + new RestBulkApiV7[T, C](elasticsearchParams.indexName, settings.versionType, settings.allowExplicitIndex, writer) + case other => throw new IllegalArgumentException(s"API version $other is not supported") } diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourceStage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourceStage.scala index f65280ee74..eb4075b973 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourceStage.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourceStage.scala @@ -9,7 +9,13 @@ import akka.http.scaladsl.HttpExt import akka.http.scaladsl.model.Uri.Path import akka.http.scaladsl.model._ import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.alpakka.elasticsearch.{ApiVersion, ElasticsearchParams, ElasticsearchSourceSettings, ReadResult} +import akka.stream.alpakka.elasticsearch.{ + ApiVersion, + ElasticsearchParams, + OpensearchApiVersion, + ReadResult, + SourceSettingsBase +} import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler, StageLogging} import akka.stream.{Attributes, Materializer, Outlet, SourceShape} import spray.json.DefaultJsonProtocol._ @@ -45,7 +51,7 @@ private[elasticsearch] trait MessageReader[T] { private[elasticsearch] final class ElasticsearchSourceStage[T]( elasticsearchParams: ElasticsearchParams, searchParams: Map[String, String], - settings: ElasticsearchSourceSettings, + settings: SourceSettingsBase[_, _], reader: MessageReader[T] )(implicit http: HttpExt, mat: Materializer, ec: ExecutionContext) extends GraphStage[SourceShape[ReadResult[T]]] { @@ -71,7 +77,7 @@ object ElasticsearchSourceStage { private[elasticsearch] final class ElasticsearchSourceLogic[T]( elasticsearchParams: ElasticsearchParams, searchParams: Map[String, String], - settings: ElasticsearchSourceSettings, + settings: SourceSettingsBase[_, _], out: Outlet[ReadResult[T]], shape: SourceShape[ReadResult[T]], reader: MessageReader[T] @@ -142,6 +148,8 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( val endpoint: String = settings.apiVersion match { case ApiVersion.V5 => s"/${elasticsearchParams.indexName}/${elasticsearchParams.typeName.get}/_search" case ApiVersion.V7 => s"/${elasticsearchParams.indexName}/_search" + case OpensearchApiVersion.V1 => s"/${elasticsearchParams.indexName}/_search" + case other => throw new IllegalArgumentException(s"API version $other is not supported") } val uri = prepareUri(Path(endpoint)) diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchFlow.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchFlow.scala index 0c9c0e91bf..8f19c4a968 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchFlow.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchFlow.scala @@ -27,7 +27,7 @@ object ElasticsearchFlow { */ def create[T]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], objectMapper: ObjectMapper ): akka.stream.javadsl.Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] = create(elasticsearchParams, settings, new JacksonWriter[T](objectMapper)) @@ -43,7 +43,7 @@ object ElasticsearchFlow { */ def create[T]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], messageWriter: MessageWriter[T] ): akka.stream.javadsl.Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] = scaladsl.ElasticsearchFlow @@ -62,7 +62,7 @@ object ElasticsearchFlow { */ def createWithPassThrough[T, C]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], objectMapper: ObjectMapper ): akka.stream.javadsl.Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] = createWithPassThrough(elasticsearchParams, settings, new JacksonWriter[T](objectMapper)) @@ -79,7 +79,7 @@ object ElasticsearchFlow { */ def createWithPassThrough[T, C]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], messageWriter: MessageWriter[T] ): akka.stream.javadsl.Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] = scaladsl.ElasticsearchFlow @@ -99,7 +99,7 @@ object ElasticsearchFlow { */ def createBulk[T, C]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], objectMapper: ObjectMapper ): akka.stream.javadsl.Flow[java.util.List[WriteMessage[T, C]], java.util.List[WriteResult[T, C]], NotUsed] = createBulk(elasticsearchParams, settings, new JacksonWriter[T](objectMapper)) @@ -117,7 +117,7 @@ object ElasticsearchFlow { */ def createBulk[T, C]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], messageWriter: MessageWriter[T] ): akka.stream.javadsl.Flow[java.util.List[WriteMessage[T, C]], java.util.List[WriteResult[T, C]], NotUsed] = akka.stream.scaladsl @@ -142,7 +142,7 @@ object ElasticsearchFlow { @ApiMayChange def createWithContext[T, C]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], objectMapper: ObjectMapper ): akka.stream.javadsl.FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] = createWithContext(elasticsearchParams, settings, new JacksonWriter[T](objectMapper)) @@ -159,7 +159,7 @@ object ElasticsearchFlow { @ApiMayChange def createWithContext[T, C]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], messageWriter: MessageWriter[T] ): akka.stream.javadsl.FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] = scaladsl.ElasticsearchFlow diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSink.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSink.scala index be7fbc86d5..e80459cdd9 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSink.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSink.scala @@ -21,7 +21,7 @@ object ElasticsearchSink { */ def create[T]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], objectMapper: ObjectMapper ): akka.stream.javadsl.Sink[WriteMessage[T, NotUsed], CompletionStage[Done]] = ElasticsearchFlow diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSource.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSource.scala index 1d5287c289..dd1c48427c 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSource.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSource.scala @@ -27,7 +27,7 @@ object ElasticsearchSource { */ def create(elasticsearchParams: ElasticsearchParams, query: String, - settings: ElasticsearchSourceSettings): Source[ReadResult[java.util.Map[String, Object]], NotUsed] = + settings: SourceSettingsBase[_, _]): Source[ReadResult[java.util.Map[String, Object]], NotUsed] = create(elasticsearchParams, query, settings, new ObjectMapper()) /** @@ -36,7 +36,7 @@ object ElasticsearchSource { */ def create(elasticsearchParams: ElasticsearchParams, query: String, - settings: ElasticsearchSourceSettings, + settings: SourceSettingsBase[_, _], objectMapper: ObjectMapper): Source[ReadResult[java.util.Map[String, Object]], NotUsed] = Source .fromMaterializer { (mat: Materializer, _: Attributes) => @@ -70,7 +70,7 @@ object ElasticsearchSource { */ def create(elasticsearchParams: ElasticsearchParams, searchParams: java.util.Map[String, String], - settings: ElasticsearchSourceSettings, + settings: SourceSettingsBase[_, _], objectMapper: ObjectMapper): Source[ReadResult[java.util.Map[String, Object]], NotUsed] = Source .fromMaterializer { (mat: Materializer, _: Attributes) => @@ -97,7 +97,7 @@ object ElasticsearchSource { */ def typed[T](elasticsearchParams: ElasticsearchParams, query: String, - settings: ElasticsearchSourceSettings, + settings: SourceSettingsBase[_, _], clazz: Class[T]): Source[ReadResult[T], NotUsed] = typed[T](elasticsearchParams, query, settings, clazz, new ObjectMapper()) @@ -107,7 +107,7 @@ object ElasticsearchSource { */ def typed[T](elasticsearchParams: ElasticsearchParams, query: String, - settings: ElasticsearchSourceSettings, + settings: SourceSettingsBase[_, _], clazz: Class[T], objectMapper: ObjectMapper): Source[ReadResult[T], NotUsed] = Source @@ -141,7 +141,7 @@ object ElasticsearchSource { */ def typed[T](elasticsearchParams: ElasticsearchParams, searchParams: java.util.Map[String, String], - settings: ElasticsearchSourceSettings, + settings: SourceSettingsBase[_, _], clazz: Class[T], objectMapper: ObjectMapper): Source[ReadResult[T], NotUsed] = Source diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala index a9380de6ae..a2163398d1 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala @@ -27,7 +27,7 @@ object ElasticsearchFlow { * * This factory method requires an implicit Spray JSON writer for `T`. */ - def create[T](elasticsearchParams: ElasticsearchParams, settings: ElasticsearchWriteSettings)( + def create[T](elasticsearchParams: ElasticsearchParams, settings: WriteSettingsBase[_, _])( implicit sprayJsonWriter: JsonWriter[T] ): Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] = create[T](elasticsearchParams, settings, new SprayJsonWriter[T]()(sprayJsonWriter)) @@ -38,7 +38,7 @@ object ElasticsearchFlow { * successful execution. */ def create[T](elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], writer: MessageWriter[T]): Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] = { Flow[WriteMessage[T, NotUsed]] .batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ wm } @@ -54,7 +54,7 @@ object ElasticsearchFlow { * * This factory method requires an implicit Spray JSON writer for `T`. */ - def createWithPassThrough[T, C](elasticsearchParams: ElasticsearchParams, settings: ElasticsearchWriteSettings)( + def createWithPassThrough[T, C](elasticsearchParams: ElasticsearchParams, settings: WriteSettingsBase[_, _])( implicit sprayJsonWriter: JsonWriter[T] ): Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] = createWithPassThrough[T, C](elasticsearchParams, settings, new SprayJsonWriter[T]()(sprayJsonWriter)) @@ -66,7 +66,7 @@ object ElasticsearchFlow { * successful execution. */ def createWithPassThrough[T, C](elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], writer: MessageWriter[T]): Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] = { Flow[WriteMessage[T, C]] .batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ wm } @@ -83,7 +83,7 @@ object ElasticsearchFlow { * * This factory method requires an implicit Spray JSON writer for `T`. */ - def createBulk[T, C](elasticsearchParams: ElasticsearchParams, settings: ElasticsearchWriteSettings)( + def createBulk[T, C](elasticsearchParams: ElasticsearchParams, settings: WriteSettingsBase[_, _])( implicit sprayJsonWriter: JsonWriter[T] ): Flow[immutable.Seq[WriteMessage[T, C]], immutable.Seq[WriteResult[T, C]], NotUsed] = createBulk[T, C](elasticsearchParams, settings, new SprayJsonWriter[T]()(sprayJsonWriter)) @@ -97,7 +97,7 @@ object ElasticsearchFlow { */ def createBulk[T, C]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], writer: MessageWriter[T] ): Flow[immutable.Seq[WriteMessage[T, C]], immutable.Seq[WriteResult[T, C]], NotUsed] = { stageFlow(elasticsearchParams, settings, writer) @@ -112,7 +112,7 @@ object ElasticsearchFlow { * This factory method requires an implicit Spray JSON writer for `T`. */ @ApiMayChange - def createWithContext[T, C](elasticsearchParams: ElasticsearchParams, settings: ElasticsearchWriteSettings)( + def createWithContext[T, C](elasticsearchParams: ElasticsearchParams, settings: WriteSettingsBase[_, _])( implicit sprayJsonWriter: JsonWriter[T] ): FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] = createWithContext[T, C](elasticsearchParams, settings, new SprayJsonWriter[T]()(sprayJsonWriter)) @@ -126,7 +126,7 @@ object ElasticsearchFlow { @ApiMayChange def createWithContext[T, C]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], writer: MessageWriter[T] ): FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] = { Flow[WriteMessage[T, C]] @@ -141,7 +141,7 @@ object ElasticsearchFlow { @InternalApi private def stageFlow[T, C]( elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], writer: MessageWriter[T] ): Flow[immutable.Seq[WriteMessage[T, C]], immutable.Seq[WriteResult[T, C]], NotUsed] = { if (settings.retryLogic == RetryNever) { @@ -204,7 +204,7 @@ object ElasticsearchFlow { @InternalApi private def basicStageFlow[T, C](elasticsearchParams: ElasticsearchParams, - settings: ElasticsearchWriteSettings, + settings: WriteSettingsBase[_, _], writer: MessageWriter[T]) = { Flow .fromMaterializer { (mat, _) => diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSink.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSink.scala index 67dec4aede..d02ed9a282 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSink.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSink.scala @@ -19,7 +19,7 @@ object ElasticsearchSink { /** * Create a sink to update Elasticsearch with [[akka.stream.alpakka.elasticsearch.WriteMessage WriteMessage]]s containing type `T`. */ - def create[T](elasticsearchParams: ElasticsearchParams, settings: ElasticsearchWriteSettings)( + def create[T](elasticsearchParams: ElasticsearchParams, settings: WriteSettingsBase[_, _])( implicit sprayJsonWriter: JsonWriter[T] ): Sink[WriteMessage[T, NotUsed], Future[Done]] = ElasticsearchFlow.create[T](elasticsearchParams, settings).toMat(Sink.ignore)(Keep.right) diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala index 4cf07d132b..e8893ea016 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala @@ -26,7 +26,7 @@ object ElasticsearchSource { def apply( elasticsearchParams: ElasticsearchParams, query: String, - settings: ElasticsearchSourceSettings + settings: SourceSettingsBase[_, _] ): Source[ReadResult[JsObject], NotUsed] = create(elasticsearchParams, query, settings) /** @@ -40,7 +40,7 @@ object ElasticsearchSource { */ def apply(elasticsearchParams: ElasticsearchParams, searchParams: Map[String, String], - settings: ElasticsearchSourceSettings): Source[ReadResult[JsObject], NotUsed] = + settings: SourceSettingsBase[_, _]): Source[ReadResult[JsObject], NotUsed] = create(elasticsearchParams, searchParams, settings) /** @@ -49,7 +49,7 @@ object ElasticsearchSource { */ def create(elasticsearchParams: ElasticsearchParams, query: String, - settings: ElasticsearchSourceSettings): Source[ReadResult[JsObject], NotUsed] = + settings: SourceSettingsBase[_, _]): Source[ReadResult[JsObject], NotUsed] = create(elasticsearchParams, Map("query" -> query), settings) /** @@ -62,7 +62,7 @@ object ElasticsearchSource { */ def create(elasticsearchParams: ElasticsearchParams, searchParams: Map[String, String], - settings: ElasticsearchSourceSettings): Source[ReadResult[JsObject], NotUsed] = + settings: SourceSettingsBase[_, _]): Source[ReadResult[JsObject], NotUsed] = Source .fromMaterializer { (mat, _) => implicit val system: ActorSystem = mat.system @@ -84,7 +84,7 @@ object ElasticsearchSource { * Creates a [[akka.stream.scaladsl.Source]] from Elasticsearch that streams [[ReadResult]]s of type `T` * converted by Spray's [[spray.json.JsonReader]] */ - def typed[T](elasticsearchParams: ElasticsearchParams, query: String, settings: ElasticsearchSourceSettings)( + def typed[T](elasticsearchParams: ElasticsearchParams, query: String, settings: SourceSettingsBase[_, _])( implicit sprayJsonReader: JsonReader[T] ): Source[ReadResult[T], NotUsed] = typed(elasticsearchParams, Map("query" -> query), settings) @@ -99,7 +99,7 @@ object ElasticsearchSource { */ def typed[T](elasticsearchParams: ElasticsearchParams, searchParams: Map[String, String], - settings: ElasticsearchSourceSettings)( + settings: SourceSettingsBase[_, _])( implicit sprayJsonReader: JsonReader[T] ): Source[ReadResult[T], NotUsed] = Source diff --git a/elasticsearch/src/test/java/docs/javadsl/ElasticsearchParameterizedTest.java b/elasticsearch/src/test/java/docs/javadsl/ElasticsearchParameterizedTest.java index fd64184e36..b8fbfc91b4 100644 --- a/elasticsearch/src/test/java/docs/javadsl/ElasticsearchParameterizedTest.java +++ b/elasticsearch/src/test/java/docs/javadsl/ElasticsearchParameterizedTest.java @@ -27,6 +27,7 @@ @RunWith(value = Parameterized.class) public class ElasticsearchParameterizedTest extends ElasticsearchTestBase { + private final ApiVersion apiVersion; @Parameterized.Parameters(name = "{index}: port={0} api={1}") public static Iterable data() { @@ -37,10 +38,13 @@ public static Iterable data() { }); } - public ElasticsearchParameterizedTest(int port, ApiVersion apiVersion) {} + public ElasticsearchParameterizedTest(int port, ApiVersion apiVersion) { + this.apiVersion = apiVersion; + } @Parameterized.BeforeParam - public static void beforeParam(int port, ApiVersion esApiVersion) throws IOException { + public static void beforeParam( + int port, akka.stream.alpakka.elasticsearch.ApiVersionBase esApiVersion) throws IOException { prepareIndex(port, esApiVersion); } diff --git a/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTestBase.java b/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTestBase.java index 1d3e0e51ee..1856b96a07 100644 --- a/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTestBase.java +++ b/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTestBase.java @@ -9,8 +9,11 @@ import akka.http.javadsl.model.ContentTypes; import akka.http.javadsl.model.HttpRequest; import akka.stream.alpakka.elasticsearch.ApiVersion; +import akka.stream.alpakka.elasticsearch.ApiVersionBase; import akka.stream.alpakka.elasticsearch.ElasticsearchConnectionSettings; import akka.stream.alpakka.elasticsearch.ElasticsearchParams; +import akka.stream.alpakka.elasticsearch.OpensearchApiVersion; +import akka.stream.alpakka.elasticsearch.OpensearchParams; import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; @@ -24,7 +27,6 @@ public class ElasticsearchTestBase { @Rule public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4(); - protected static ApiVersion apiVersion; protected static ElasticsearchConnectionSettings connectionSettings; protected static ActorSystem system; protected static Http http; @@ -52,9 +54,8 @@ public static void teardown() { TestKit.shutdownActorSystem(system); } - protected static void prepareIndex(int port, ApiVersion esApiVersion) throws IOException { - apiVersion = esApiVersion; - + protected static void prepareIndex( + int port, akka.stream.alpakka.elasticsearch.ApiVersionBase version) throws IOException { connectionSettings = ElasticsearchConnectionSettings.create(String.format("http://localhost:%d", port)); @@ -141,11 +142,15 @@ public KafkaMessage(Book book, KafkaOffset offset) { } protected ElasticsearchParams constructElasticsearchParams( - String indexName, String typeName, ApiVersion apiVersion) { + String indexName, String typeName, ApiVersionBase apiVersion) { if (apiVersion == ApiVersion.V5) { return ElasticsearchParams.V5(indexName, typeName); - } else { + } else if (apiVersion == ApiVersion.V7) { return ElasticsearchParams.V7(indexName); + } else if (apiVersion == OpensearchApiVersion.V1) { + return OpensearchParams.V1(indexName); + } else { + throw new IllegalArgumentException("API version " + apiVersion + " is not supported"); } } } diff --git a/elasticsearch/src/test/java/docs/javadsl/OpensearchParameterizedTest.java b/elasticsearch/src/test/java/docs/javadsl/OpensearchParameterizedTest.java new file mode 100644 index 0000000000..1682ab791c --- /dev/null +++ b/elasticsearch/src/test/java/docs/javadsl/OpensearchParameterizedTest.java @@ -0,0 +1,231 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package docs.javadsl; + +import akka.NotUsed; +import akka.stream.alpakka.elasticsearch.*; +import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchFlow; +import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSource; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(value = Parameterized.class) +public class OpensearchParameterizedTest extends ElasticsearchTestBase { + private final OpensearchApiVersion apiVersion; + + @Parameterized.Parameters(name = "{index}: port={0} api={1}") + public static Iterable data() { + return Arrays.asList(new Object[][] {{9203, OpensearchApiVersion.V1}}); + } + + public OpensearchParameterizedTest(int port, OpensearchApiVersion apiVersion) { + this.apiVersion = apiVersion; + } + + @Parameterized.BeforeParam + public static void beforeParam( + int port, akka.stream.alpakka.elasticsearch.ApiVersionBase osApiVersion) throws IOException { + prepareIndex(port, osApiVersion); + } + + @Parameterized.AfterParam + public static void afterParam() throws IOException { + cleanIndex(); + } + + private void documentation() { + // #connection-settings + ElasticsearchConnectionSettings connectionSettings = + OpensearchConnectionSettings.create("http://localhost:9200") + .withCredentials("user", "password"); + // #connection-settings + + // #source-settings + OpensearchSourceSettings sourceSettings = + OpensearchSourceSettings.create(connectionSettings).withBufferSize(10); + // #source-settings + // #sink-settings + OpensearchWriteSettings settings = + OpensearchWriteSettings.create(connectionSettings) + .withBufferSize(10) + .withVersionType("internal") + .withRetryLogic(RetryAtFixedRate.create(5, Duration.ofSeconds(1))) + .withApiVersion(OpensearchApiVersion.V1); + // #sink-settings + + // #opensearch-params + ElasticsearchParams opensearchParams = OpensearchParams.V1("source"); + // #opensearch-params + } + + @Test + public void testUsingVersions() throws Exception { + // Since the scala-test does a lot more logic testing, + // all we need to test here is that we can receive and send version + + String indexName = "test_using_versions"; + String typeName = "_doc"; + + // Insert document + Book book = new Book("b"); + Source.single(WriteMessage.createIndexMessage("1", book)) + .via( + ElasticsearchFlow.create( + constructElasticsearchParams(indexName, typeName, apiVersion), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(apiVersion) + .withBufferSize(5), + new ObjectMapper())) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(); + + flushAndRefresh(indexName); + + // Search document and assert it having version 1 + ReadResult message = + ElasticsearchSource.typed( + constructElasticsearchParams(indexName, typeName, apiVersion), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(apiVersion) + .withIncludeDocumentVersion(true), + Book.class) + .runWith(Sink.head(), system) + .toCompletableFuture() + .get(); + + assertEquals(1L, message.version().get()); + + flushAndRefresh(indexName); + + // Update document to version 2 + Source.single(WriteMessage.createIndexMessage("1", book).withVersion(1L)) + .via( + ElasticsearchFlow.create( + constructElasticsearchParams(indexName, typeName, apiVersion), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(apiVersion) + .withBufferSize(5) + .withVersionType("external"), + new ObjectMapper())) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(); + + flushAndRefresh(indexName); + + // Try to update document with wrong version to assert that we can send it + long oldVersion = 1; + boolean success = + Source.single(WriteMessage.createIndexMessage("1", book).withVersion(oldVersion)) + .via( + ElasticsearchFlow.create( + constructElasticsearchParams(indexName, typeName, apiVersion), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(apiVersion) + .withBufferSize(5) + .withVersionType("external"), + new ObjectMapper())) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get() + .get(0) + .success(); + + assertEquals(false, success); + } + + @Test + public void testUsingVersionType() throws Exception { + String indexName = "book-test-version-type"; + String typeName = "_doc"; + + Book book = new Book("A sample title"); + String docId = "1"; + long externalVersion = 5; + + // Insert new document using external version + Source.single(WriteMessage.createIndexMessage("1", book).withVersion(externalVersion)) + .via( + ElasticsearchFlow.create( + constructElasticsearchParams(indexName, typeName, apiVersion), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(apiVersion) + .withBufferSize(5) + .withVersionType("external"), + new ObjectMapper())) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(); + + flushAndRefresh(indexName); + + // Assert that the document's external version is saved + ReadResult message = + ElasticsearchSource.typed( + constructElasticsearchParams(indexName, typeName, apiVersion), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(apiVersion) + .withIncludeDocumentVersion(true), + Book.class) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get() + .get(0); + + assertEquals(externalVersion, message.version().get()); + } + + @Test + public void testMultipleIndicesWithNoMatching() throws Exception { + String indexName = "missing-*"; + String typeName = "_doc"; + + // Assert that the document's external version is saved + List> readResults = + ElasticsearchSource.typed( + constructElasticsearchParams(indexName, typeName, apiVersion), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings).withApiVersion(apiVersion), + Book.class) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(); + + assertTrue(readResults.isEmpty()); + } + + public void compileOnlySample() { + String doc = "dummy-doc"; + + // #custom-index-name-example + WriteMessage msg = + WriteMessage.createIndexMessage(doc).withIndexName("my-index"); + // #custom-index-name-example + + // #custom-metadata-example + Map metadata = new HashMap<>(); + metadata.put("pipeline", "myPipeline"); + WriteMessage msgWithMetadata = + WriteMessage.createIndexMessage(doc).withCustomMetadata(metadata); + // #custom-metadata-example + } +} diff --git a/elasticsearch/src/test/java/docs/javadsl/OpensearchV1Test.java b/elasticsearch/src/test/java/docs/javadsl/OpensearchV1Test.java new file mode 100644 index 0000000000..2fdca7b79d --- /dev/null +++ b/elasticsearch/src/test/java/docs/javadsl/OpensearchV1Test.java @@ -0,0 +1,436 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package docs.javadsl; + +import akka.Done; +import akka.NotUsed; +import akka.stream.alpakka.elasticsearch.*; +import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchFlow; +import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSink; +import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSource; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +public class OpensearchV1Test extends ElasticsearchTestBase { + @BeforeClass + public static void setup() throws IOException { + setupBase(); + + prepareIndex(9203, OpensearchApiVersion.V1); + } + + @AfterClass + public static void shutdown() throws IOException { + cleanIndex(); + } + + @Test + public void typedStream() throws Exception { + // Copy source/book to sink2/book through JsObject stream + // #run-typed + OpensearchSourceSettings sourceSettings = + OpensearchSourceSettings.create(connectionSettings).withApiVersion(OpensearchApiVersion.V1); + OpensearchWriteSettings sinkSettings = + OpensearchWriteSettings.create(connectionSettings).withApiVersion(OpensearchApiVersion.V1); + + Source, NotUsed> source = + ElasticsearchSource.typed( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + sourceSettings, + Book.class); + CompletionStage f1 = + source + .map(m -> WriteMessage.createIndexMessage(m.id(), m.source())) + .runWith( + ElasticsearchSink.create( + constructElasticsearchParams("sink2", "_doc", OpensearchApiVersion.V1), + sinkSettings, + new ObjectMapper()), + system); + // #run-typed + + f1.toCompletableFuture().get(); + + flushAndRefresh("sink2"); + + // Assert docs in sink2/book + CompletionStage> f2 = + ElasticsearchSource.typed( + constructElasticsearchParams("sink2", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + Book.class) + .map(m -> m.source().title) + .runWith(Sink.seq(), system); + + List result = new ArrayList<>(f2.toCompletableFuture().get()); + + List expect = + Arrays.asList( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production"); + + Collections.sort(result); + assertEquals(expect, result); + } + + @Test + public void jsObjectStream() throws Exception { + // Copy source/book to sink1/book through JsObject stream + // #run-jsobject + OpensearchSourceSettings sourceSettings = + OpensearchSourceSettings.create(connectionSettings).withApiVersion(OpensearchApiVersion.V1); + OpensearchWriteSettings sinkSettings = + OpensearchWriteSettings.create(connectionSettings).withApiVersion(OpensearchApiVersion.V1); + + Source>, NotUsed> source = + ElasticsearchSource.create( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + sourceSettings); + CompletionStage f1 = + source + .map(m -> WriteMessage.createIndexMessage(m.id(), m.source())) + .runWith( + ElasticsearchSink.create( + constructElasticsearchParams("sink1", "_doc", OpensearchApiVersion.V1), + sinkSettings, + new ObjectMapper()), + system); + // #run-jsobject + + f1.toCompletableFuture().get(); + + flushAndRefresh("sink1"); + + // Assert docs in sink1/_doc + CompletionStage> f2 = + ElasticsearchSource.create( + constructElasticsearchParams("sink1", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5)) + .map(m -> (String) m.source().get("title")) + .runWith(Sink.seq(), system); + + List result = new ArrayList<>(f2.toCompletableFuture().get()); + + List expect = + Arrays.asList( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production"); + + Collections.sort(result); + assertEquals(expect, result); + } + + @Test + public void flow() throws Exception { + // Copy source/book to sink3/book through JsObject stream + // #run-flow + CompletionStage>> f1 = + ElasticsearchSource.typed( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + Book.class) + .map(m -> WriteMessage.createIndexMessage(m.id(), m.source())) + .via( + ElasticsearchFlow.create( + constructElasticsearchParams("sink3", "_doc", OpensearchApiVersion.V1), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + new ObjectMapper())) + .runWith(Sink.seq(), system); + // #run-flow + + List> result1 = f1.toCompletableFuture().get(); + flushAndRefresh("sink3"); + + for (WriteResult aResult1 : result1) { + assertEquals(true, aResult1.success()); + } + + // Assert docs in sink3/book + CompletionStage> f2 = + ElasticsearchSource.typed( + constructElasticsearchParams("sink3", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withConnection(connectionSettings) + .withBufferSize(5), + Book.class) + .map(m -> m.source().title) + .runWith(Sink.seq(), system); + + List result2 = new ArrayList<>(f2.toCompletableFuture().get()); + + List expect = + Arrays.asList( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production"); + + Collections.sort(result2); + assertEquals(expect, result2); + } + + @Test + public void stringFlow() throws Exception { + // Copy source/book to sink3/book through JsObject stream + String indexName = "sink3-0"; + // #string + CompletionStage>> write = + Source.from( + Arrays.asList( + WriteMessage.createIndexMessage("1", "{\"title\": \"Das Parfum\"}"), + WriteMessage.createIndexMessage("2", "{\"title\": \"Faust\"}"), + WriteMessage.createIndexMessage( + "3", "{\"title\": \"Die unendliche Geschichte\"}"))) + .via( + ElasticsearchFlow.create( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + StringMessageWriter.getInstance())) + .runWith(Sink.seq(), system); + // #string + + List> result1 = write.toCompletableFuture().get(); + flushAndRefresh(indexName); + + for (WriteResult aResult1 : result1) { + assertEquals(true, aResult1.success()); + } + + CompletionStage> f2 = + ElasticsearchSource.typed( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + Book.class) + .map(m -> m.source().title) + .runWith(Sink.seq(), system); + + List result2 = new ArrayList<>(f2.toCompletableFuture().get()); + + List expect = Arrays.asList("Das Parfum", "Die unendliche Geschichte", "Faust"); + + Collections.sort(result2); + assertEquals(expect, result2); + } + + @Test + public void testMultipleOperations() throws Exception { + // #multiple-operations + // Create, update, upsert and delete documents in sink8/book + List> requests = + Arrays.asList( + WriteMessage.createIndexMessage("00001", new Book("Book 1")), + WriteMessage.createUpsertMessage("00002", new Book("Book 2")), + WriteMessage.createUpsertMessage("00003", new Book("Book 3")), + WriteMessage.createUpdateMessage("00004", new Book("Book 4")), + WriteMessage.createDeleteMessage("00002")); + + Source.from(requests) + .via( + ElasticsearchFlow.create( + constructElasticsearchParams("sink8", "_doc", OpensearchApiVersion.V1), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1), + new ObjectMapper())) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(); + // #multiple-operations + + flushAndRefresh("sink8"); + + // Assert docs in sink8/book + CompletionStage> f2 = + ElasticsearchSource.typed( + constructElasticsearchParams("sink8", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1), + Book.class) + .map(m -> m.source().title) + .runWith(Sink.seq(), system); + + List result2 = new ArrayList<>(f2.toCompletableFuture().get()); + List expect = Arrays.asList("Book 1", "Book 3"); + Collections.sort(result2); + + assertEquals(expect, result2); + } + + @Test + public void testKafkaExample() throws Exception { + // #kafka-example + // We're going to pretend we got messages from kafka. + // After we've written them to Elastic, we want + // to commit the offset to Kafka + + List messagesFromKafka = + Arrays.asList( + new KafkaMessage(new Book("Book 1"), new KafkaOffset(0)), + new KafkaMessage(new Book("Book 2"), new KafkaOffset(1)), + new KafkaMessage(new Book("Book 3"), new KafkaOffset(2))); + + final KafkaCommitter kafkaCommitter = new KafkaCommitter(); + + CompletionStage kafkaToOs = + Source.from(messagesFromKafka) // Assume we get this from Kafka + .map( + kafkaMessage -> { + Book book = kafkaMessage.book; + String id = book.title; + + // Transform message so that we can write to elastic + return WriteMessage.createIndexMessage(id, book) + .withPassThrough(kafkaMessage.offset); + }) + .via( // write to elastic + ElasticsearchFlow.createWithPassThrough( + constructElasticsearchParams("sink6", "_doc", OpensearchApiVersion.V1), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + new ObjectMapper())) + .map( + result -> { + if (!result.success()) + throw new RuntimeException("Failed to write message to elastic"); + // Commit to kafka + kafkaCommitter.commit(result.message().passThrough()); + return NotUsed.getInstance(); + }) + .runWith(Sink.ignore(), system); + // #kafka-example + kafkaToOs.toCompletableFuture().get(5, TimeUnit.SECONDS); // Wait for it to complete + flushAndRefresh("sink6"); + + // Make sure all messages was committed to kafka + assertEquals(Arrays.asList(0, 1, 2), kafkaCommitter.committedOffsets); + + // Assert that all docs were written to elastic + List result2 = + ElasticsearchSource.typed( + constructElasticsearchParams("sink6", "_doc", OpensearchApiVersion.V1), + "{\"match_all\": {}}", + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1), + Book.class) + .map(m -> m.source().title) + .runWith(Sink.seq(), system) // Run it + .toCompletableFuture() + .get(); // Wait for it to complete + + assertEquals( + messagesFromKafka.stream().map(m -> m.book.title).sorted().collect(Collectors.toList()), + result2.stream().sorted().collect(Collectors.toList())); + } + + @Test + public void testUsingSearchParams() throws Exception { + + String indexName = "test_using_search_params_versions_java"; + String typeName = "_doc"; + + List docs = + Arrays.asList( + new TestDoc("1", "a1", "b1", "c1"), + new TestDoc("2", "a2", "b2", "c2"), + new TestDoc("3", "a3", "b3", "c3")); + + // Insert document + Source.from(docs) + .map((TestDoc d) -> WriteMessage.createIndexMessage(d.id, d)) + .via( + ElasticsearchFlow.create( + constructElasticsearchParams(indexName, typeName, OpensearchApiVersion.V1), + OpensearchWriteSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1) + .withBufferSize(5), + new ObjectMapper())) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(); + + flushAndRefresh(indexName); + + // #custom-search-params + // Search for docs and ask elastic to only return some fields + + Map searchParams = new HashMap<>(); + searchParams.put("query", "{\"match_all\": {}}"); + searchParams.put("_source", "[\"id\", \"a\", \"c\"]"); + + List result = + ElasticsearchSource.typed( + constructElasticsearchParams(indexName, typeName, OpensearchApiVersion.V1), + searchParams, // <-- Using searchParams + OpensearchSourceSettings.create(connectionSettings) + .withApiVersion(OpensearchApiVersion.V1), + TestDoc.class, + new ObjectMapper()) + .map( + o -> { + return o.source(); // These documents will only have property id, a and c (not + }) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(); + // #custom-search-params + flushAndRefresh(indexName); + + assertEquals( + docs.size(), + result.stream() + .filter( + d -> { + return d.a != null && d.b == null; + }) + .collect(Collectors.toList()) + .size()); + } +} diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala index c143a30278..24d58f9c81 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala @@ -10,10 +10,12 @@ import akka.http.scaladsl.model.Uri.Path import akka.http.scaladsl.model.{ContentTypes, HttpMethods, HttpRequest, Uri} import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSource import akka.stream.alpakka.elasticsearch.{ - ApiVersion, + ApiVersionBase, ElasticsearchConnectionSettings, ElasticsearchParams, - ElasticsearchSourceSettings + OpensearchApiVersion, + OpensearchParams, + SourceSettingsBase } import akka.stream.scaladsl.Sink import org.scalatest.concurrent.ScalaFutures @@ -56,8 +58,8 @@ trait ElasticsearchSpecUtils { this: AnyWordSpec with ScalaFutures => http.singleRequest(refreshRequest).futureValue } - def readTitlesFrom(apiVersion: ApiVersion, - sourceSettings: ElasticsearchSourceSettings, + def readTitlesFrom(apiVersion: ApiVersionBase, + sourceSettings: SourceSettingsBase[_, _], indexName: String): Future[immutable.Seq[String]] = ElasticsearchSource .typed[Book]( @@ -81,11 +83,17 @@ trait ElasticsearchSpecUtils { this: AnyWordSpec with ScalaFutures => flushAndRefresh(connectionSettings, "source") } - def constructElasticsearchParams(indexName: String, typeName: String, apiVersion: ApiVersion): ElasticsearchParams = { - if (apiVersion == ApiVersion.V5) { + def constructElasticsearchParams(indexName: String, + typeName: String, + apiVersion: ApiVersionBase): ElasticsearchParams = { + if (apiVersion == akka.stream.alpakka.elasticsearch.ApiVersion.V5) { ElasticsearchParams.V5(indexName, typeName) - } else { + } else if (apiVersion == akka.stream.alpakka.elasticsearch.ApiVersion.V7) { ElasticsearchParams.V7(indexName) + } else if (apiVersion == OpensearchApiVersion.V1) { + OpensearchParams.V1(indexName) + } else { + throw new IllegalArgumentException(s"API version $apiVersion is not supported") } } } diff --git a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala new file mode 100644 index 0000000000..bf8b9e877a --- /dev/null +++ b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchConnectorBehaviour.scala @@ -0,0 +1,758 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package docs.scaladsl + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import akka.http.scaladsl.HttpExt +import akka.http.scaladsl.model.Uri.Path +import akka.http.scaladsl.model.{ContentTypes, HttpMethods, HttpRequest, Uri} +import akka.stream.alpakka.elasticsearch._ +import akka.stream.alpakka.elasticsearch.scaladsl._ +import akka.stream.scaladsl.{Sink, Source} +import akka.{Done, NotUsed} +import org.scalatest.Inspectors +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import scala.collection.immutable +import scala.concurrent.duration._ + +trait OpensearchConnectorBehaviour { + this: AnyWordSpec with Matchers with ScalaFutures with Inspectors with ElasticsearchSpecUtils => + + override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds) + + def opensearchConnector(apiVersion: OpensearchApiVersion, connectionSettings: ElasticsearchConnectionSettings)( + implicit system: ActorSystem, + http: HttpExt + ): Unit = { + + val baseSourceSettings = OpensearchSourceSettings(connectionSettings).withApiVersion(apiVersion) + val baseWriteSettings = OpensearchWriteSettings(connectionSettings).withApiVersion(apiVersion) + + import spray.json._ + import DefaultJsonProtocol._ + + def createStrictMapping(indexName: String): Unit = { + val uri = Uri(connectionSettings.baseUrl) + .withPath(Path(s"/$indexName")) + .withQuery(Uri.Query(Map("include_type_name" -> "true"))) + + val request = HttpRequest(HttpMethods.PUT) + .withUri(uri) + .withEntity( + ContentTypes.`application/json`, + s"""{ + | "mappings": { + | "_doc": { + | "dynamic": "strict", + | "properties": { + | "title": { "type": "text"}, + | "price": { "type": "integer"} + | } + | } + | } + |} + """.stripMargin + ) + + http.singleRequest(request).futureValue + } + + "Source Settings" should { + "convert scrollDuration value to correct scroll string value (Days)" in { + val sourceSettings = OpensearchSourceSettings(connectionSettings) + .withScrollDuration(FiniteDuration(5, TimeUnit.DAYS)) + + sourceSettings.scroll shouldEqual "5d" + } + "convert scrollDuration value to correct scroll string value (Hours)" in { + val sourceSettings = OpensearchSourceSettings(connectionSettings) + .withScrollDuration(FiniteDuration(5, TimeUnit.HOURS)) + + sourceSettings.scroll shouldEqual "5h" + } + "convert scrollDuration value to correct scroll string value (Minutes)" in { + val sourceSettings = OpensearchSourceSettings(connectionSettings) + .withScrollDuration(FiniteDuration(5, TimeUnit.MINUTES)) + + sourceSettings.scroll shouldEqual "5m" + } + "convert scrollDuration value to correct scroll string value (Seconds)" in { + val sourceSettings = OpensearchSourceSettings(connectionSettings) + .withScrollDuration(FiniteDuration(5, TimeUnit.SECONDS)) + + sourceSettings.scroll shouldEqual "5s" + } + "convert scrollDuration value to correct scroll string value (Milliseconds)" in { + val sourceSettings = OpensearchSourceSettings(connectionSettings) + .withScrollDuration(FiniteDuration(5, TimeUnit.MILLISECONDS)) + + sourceSettings.scroll shouldEqual "5ms" + } + "convert scrollDuration value to correct scroll string value (Microseconds)" in { + val sourceSettings = OpensearchSourceSettings(connectionSettings) + .withScrollDuration(FiniteDuration(5, TimeUnit.MICROSECONDS)) + + sourceSettings.scroll shouldEqual "5micros" + } + "convert scrollDuration value to correct scroll string value (Nanoseconds)" in { + val sourceSettings = OpensearchSourceSettings(connectionSettings) + .withScrollDuration(FiniteDuration(5, TimeUnit.NANOSECONDS)) + + sourceSettings.scroll shouldEqual "5nanos" + } + } + + "Sink Settings" should { + "copy explicit index name permission" in { + val sinkSettings = + OpensearchWriteSettings(connectionSettings) + .withBufferSize(10) + .withVersionType("internal") + .withRetryLogic(RetryAtFixedRate(maxRetries = 5, retryInterval = 1.second)) + + val restrictiveCopy = sinkSettings.withAllowExplicitIndex(false) + + restrictiveCopy.allowExplicitIndex shouldEqual false + } + } + + "ElasticsearchFlow" should { + "pass through data in `withContext`" in { + val books = immutable.Seq( + "Akka in Action", + "Alpakka Patterns" + ) + + val indexName = "sink3-1" + val createBooks = Source(books).zipWithIndex + .map { + case (book, index) => + (WriteMessage.createIndexMessage(index.toString, Book(book)), book) + } + .via( + ElasticsearchFlow.createWithContext( + constructElasticsearchParams(indexName, "_doc", apiVersion), + baseWriteSettings + ) + ) + .runWith(Sink.seq) + + forAll(createBooks.futureValue) { + case (writeMessage, title) => + val book = writeMessage.message.source + book.map(_.title) should contain(title) + } + } + + "not post invalid encoded JSON" in { + val books = immutable.Seq( + "Akka in Action", + "Akka \u00DF Concurrency" + ) + + val indexName = "sink4" + val createBooks = Source(books.zipWithIndex) + .map { + case (book: String, index: Int) => + WriteMessage.createIndexMessage(index.toString, Book(book)) + } + .via( + ElasticsearchFlow.create[Book]( + constructElasticsearchParams(indexName, "_doc", apiVersion), + baseWriteSettings + ) + ) + .runWith(Sink.seq) + + // Assert no error + createBooks.futureValue.filter(!_.success) shouldBe empty + flushAndRefresh(connectionSettings, indexName) + readTitlesFrom(apiVersion, baseSourceSettings, indexName).futureValue should contain allElementsOf Seq( + "Akka in Action", + "Akka \u00DF Concurrency" + ) + } + + "retry a failed document and pass retried documents to downstream (create)" in { + val indexName = "sink5" + + // Create strict mapping to prevent invalid documents + createStrictMapping(indexName) + + val createBooks = Source( + immutable + .Seq( + Book("Akka in Action").toJson, + JsObject("subject" -> "Akka Concurrency".toJson) + ) + .zipWithIndex + ).map { + case (book: JsObject, index: Int) => + WriteMessage.createIndexMessage(index.toString, book) + case _ => ??? // Keep the compiler from complaining + } + .via( + ElasticsearchFlow.create( + constructElasticsearchParams(indexName, "_doc", apiVersion), + baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 100.millis)) + ) + ) + .runWith(Sink.seq) + + val start = System.currentTimeMillis() + val writeResults = createBooks.futureValue + val end = System.currentTimeMillis() + + writeResults should have size 2 + + // Assert retired documents + val failed = writeResults.filter(!_.success).head + failed.message shouldBe WriteMessage.createIndexMessage("1", JsObject("subject" -> "Akka Concurrency".toJson)) + failed.errorReason shouldBe Some( + "mapping set to strict, dynamic introduction of [subject] within [_doc] is not allowed" + ) + + // Assert retried 5 times by looking duration + assert(end - start > 5 * 100) + + flushAndRefresh(connectionSettings, indexName) + readTitlesFrom(apiVersion, baseSourceSettings, indexName).futureValue shouldEqual Seq( + "Akka in Action" + ) + } + + "retry ALL failed document and pass retried documents to downstream (createWithPassThrough)" in { + val indexName = "sink5_1" + + val bookNr = 100 + val writeMsgs = Iterator + .from(0) + .take(bookNr) + .grouped(5) + .zipWithIndex + .flatMap { + case (numBlock, index) => + val writeMsgBlock = numBlock.map { n => + WriteMessage + .createCreateMessage(n.toString, Book(s"Book ${n}")) + .withPassThrough(n) + } + + val writeMsgFailed = WriteMessage + .createCreateMessage("0", Book(s"Failed")) + .withPassThrough(bookNr + index) + + (writeMsgBlock ++ Iterator(writeMsgFailed)).toList + } + .toList + + val createBooks = Source(writeMsgs) + .via( + ElasticsearchFlow.createWithPassThrough( + constructElasticsearchParams(indexName, "_doc", apiVersion), + baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 1.millis)) + ) + ) + .runWith(Sink.seq) + + val writeResults = createBooks.futureValue + + writeResults should have size writeMsgs.size + + flushAndRefresh(connectionSettings, indexName) + + val expectedBookTitles = Iterator.from(0).map(n => s"Book ${n}").take(bookNr).toSet + readTitlesFrom(apiVersion, baseSourceSettings, indexName).futureValue should contain theSameElementsAs expectedBookTitles + } + + "retry a failed document and pass retried documents to downstream (createWithContext)" in { + val indexName = "sink5b" + + // Create strict mapping to prevent invalid documents + createStrictMapping(indexName) + + val createBooks = Source( + immutable + .Seq( + Book("Akka in Action").toJson, + JsObject("subject" -> "Akka Concurrency".toJson), + Book("Learning Scala").toJson + ) + .zipWithIndex + ).map { + case (book: JsObject, index: Int) => + WriteMessage.createIndexMessage(index.toString, book) -> index + case _ => ??? // Keep the compiler from complaining + } + .via( + ElasticsearchFlow.createWithContext( + constructElasticsearchParams(indexName, "_doc", apiVersion), + baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 100.millis)) + ) + ) + .runWith(Sink.seq) + + val start = System.currentTimeMillis() + val writeResults = createBooks.futureValue + val end = System.currentTimeMillis() + + writeResults should have size 3 + + // Assert retired documents + writeResults.map(_._2) should contain theSameElementsInOrderAs Seq(0, 1, 2) + + val (failed, _) = writeResults.filter(!_._1.success).head + failed.message shouldBe WriteMessage + .createIndexMessage("1", JsObject("subject" -> "Akka Concurrency".toJson)) + .withPassThrough(1) + failed.errorReason shouldBe Some( + "mapping set to strict, dynamic introduction of [subject] within [_doc] is not allowed" + ) + + // Assert retried 5 times by looking duration + assert(end - start > 5 * 100) + + flushAndRefresh(connectionSettings, indexName) + readTitlesFrom(apiVersion, baseSourceSettings, indexName).futureValue should contain theSameElementsAs Seq( + "Akka in Action", + "Learning Scala" + ) + } + + "retry ALL failed document and pass retried documents to downstream (createWithContext)" in { + val indexName = "sink5_1b" + + val bookNr = 100 + val writeMsgs = Iterator + .from(0) + .take(bookNr) + .grouped(5) + .zipWithIndex + .flatMap { + case (numBlock, index) => + val writeMsgBlock = numBlock.map { n => + WriteMessage + .createCreateMessage(n.toString, Book(s"Book ${n}")) -> n + } + + val writeMsgFailed = WriteMessage + .createCreateMessage("0", Book(s"Failed")) -> (bookNr + index) + + (writeMsgBlock ++ Iterator(writeMsgFailed)).toList + } + .toList + + val createBooks = Source(writeMsgs) + .via( + ElasticsearchFlow.createWithContext( + constructElasticsearchParams(indexName, "_doc", apiVersion), + baseWriteSettings.withRetryLogic(RetryAtFixedRate(5, 1.millis)) + ) + ) + .runWith(Sink.seq) + + val writeResults = createBooks.futureValue + + writeResults should have size writeMsgs.size + writeResults.map(_._2) should contain theSameElementsInOrderAs writeMsgs.map(_._2) + + flushAndRefresh(connectionSettings, indexName) + + val expectedBookTitles = Iterator.from(0).map(n => s"Book ${n}").take(bookNr).toSet + readTitlesFrom(apiVersion, baseSourceSettings, indexName).futureValue should contain theSameElementsAs expectedBookTitles + } + + "store new documents using upsert method and partially update existing ones" in { + val books = List( + ("00001", Book("Book 1")), + ("00002", Book("Book 2")), + ("00003", Book("Book 3")) + ) + + val indexName = "sink7" + val createBooks = Source(books) + .map { book: (String, Book) => + WriteMessage.createUpsertMessage(id = book._1, source = book._2) + } + .via( + ElasticsearchFlow.create[Book]( + constructElasticsearchParams(indexName, "_doc", apiVersion), + baseWriteSettings + ) + ) + .runWith(Sink.seq) + + // Assert no errors + createBooks.futureValue.filter(!_.success) shouldBe empty + flushAndRefresh(connectionSettings, indexName) + + // Create a second dataset with matching indexes to test partial update + val updatedBooks = List( + ("00001", + JsObject( + "rating" -> JsNumber(4) + )), + ("00002", + JsObject( + "rating" -> JsNumber(3) + )), + ("00003", + JsObject( + "rating" -> JsNumber(3) + )) + ) + + // Update sink7/_doc with the second dataset + val upserts = Source(updatedBooks) + .map { book: (String, JsObject) => + WriteMessage.createUpsertMessage(id = book._1, source = book._2) + } + .via( + ElasticsearchFlow.create[JsObject]( + constructElasticsearchParams(indexName, "_doc", apiVersion), + baseWriteSettings + ) + ) + .runWith(Sink.seq) + + // Assert no errors + upserts.futureValue.filter(!_.success) shouldBe empty + flushAndRefresh(connectionSettings, indexName) + + // Assert docs in sink7/_doc + val readBooks = ElasticsearchSource( + constructElasticsearchParams(indexName, "_doc", apiVersion), + """{"match_all": {}}""", + baseSourceSettings + ).map { message => + message.source + } + .runWith(Sink.seq) + + // Docs should contain both columns + readBooks.futureValue.sortBy(_.fields("title").compactPrint) shouldEqual Seq( + JsObject( + "title" -> JsString("Book 1"), + "rating" -> JsNumber(4), + "price" -> JsNumber(10) + ), + JsObject( + "title" -> JsString("Book 2"), + "rating" -> JsNumber(3), + "price" -> JsNumber(10) + ), + JsObject( + "title" -> JsString("Book 3"), + "rating" -> JsNumber(3), + "price" -> JsNumber(10) + ) + ) + } + + "Create existing document should fail" in { + val indexName = "sink9" + val requests = List[WriteMessage[Book, NotUsed]]( + WriteMessage.createIndexMessage(id = "00001", source = Book("Book 1")), + WriteMessage.createCreateMessage(id = "00001", source = Book("Book 1")) + ) + + val writeResults = Source(requests) + .via( + ElasticsearchFlow.create[Book]( + constructElasticsearchParams(indexName, "_doc", apiVersion), + baseWriteSettings + ) + ) + .runWith(Sink.seq) + + val results = writeResults.futureValue + results should have size requests.size + // Assert error + val errorMessages = results.flatMap(_.errorReason) + errorMessages should have size 1 + errorMessages.head should include("version conflict, document already exists (current version [1])") + } + + "read and write document-version if configured to do so" in { + + case class VersionTestDoc(id: String, name: String, value: Int) + implicit val formatVersionTestDoc: JsonFormat[VersionTestDoc] = jsonFormat3(VersionTestDoc) + + val indexName = "version-test-scala" + val typeName = "_doc" + + val docs = List( + VersionTestDoc("1", "a", 0), + VersionTestDoc("2", "b", 0), + VersionTestDoc("3", "c", 0) + ) + + // insert new documents + val indexResults = Source(docs) + .map { doc => + WriteMessage.createIndexMessage(doc.id, doc) + } + .via( + ElasticsearchFlow.create[VersionTestDoc]( + constructElasticsearchParams(indexName, typeName, apiVersion), + baseWriteSettings.withBufferSize(5) + ) + ) + .runWith(Sink.seq) + + // Assert no errors + indexResults.futureValue.filter(!_.success) shouldBe empty + flushAndRefresh(connectionSettings, indexName) + + // search for the documents and assert them being at version 1, + // then update while specifying that for which version + + val updatedVersions = ElasticsearchSource + .typed[VersionTestDoc]( + constructElasticsearchParams(indexName, typeName, apiVersion), + """{"match_all": {}}""", + baseSourceSettings.withIncludeDocumentVersion(true) + ) + .map { message => + val doc = message.source + val version = message.version.get + assert(1 == version) // Assert document got version = 1 + + // Update it + + val newDoc = doc.copy(value = doc.value + 1) + + WriteMessage.createIndexMessage(newDoc.id, newDoc).withVersion(version + 1) + } + .via( + ElasticsearchFlow.create[VersionTestDoc]( + constructElasticsearchParams(indexName, typeName, apiVersion), + baseWriteSettings.withBufferSize(5).withVersionType("external") + ) + ) + .runWith(Sink.seq) + + updatedVersions.futureValue.filter(!_.success) shouldBe empty + + flushAndRefresh(connectionSettings, indexName) + // Search again to assert that all documents are now on version 2 + val assertVersions = ElasticsearchSource + .typed[VersionTestDoc]( + constructElasticsearchParams(indexName, typeName, apiVersion), + """{"match_all": {}}""", + baseSourceSettings.withIncludeDocumentVersion(true) + ) + .map { message => + val doc = message.source + val version = message.version.get + assert(doc.value == 1) + assert(2 == version) // Assert document got version = 2 + doc + } + .runWith(Sink.ignore) + + assertVersions.futureValue shouldBe Done + + // Try to write document with old version - it should fail + val illegalIndexWrites = Source + .single(VersionTestDoc("1", "a", 2)) + .map { doc => + val oldVersion = 1 + WriteMessage.createIndexMessage(doc.id, doc).withVersion(oldVersion) + } + .via( + ElasticsearchFlow.create[VersionTestDoc]( + constructElasticsearchParams(indexName, typeName, apiVersion), + baseWriteSettings.withBufferSize(5).withVersionType("external") + ) + ) + .runWith(Sink.seq) + + val result5 = illegalIndexWrites.futureValue + result5.head.success shouldBe false + } + + "allow read and write using configured version type" in { + + val indexName = "book-test-version-type" + val typeName = "_doc" + + val book = Book("A sample title") + val docId = "1" + val externalVersion = 5L + + // Insert new document using external version + val insertWrite = Source + .single(book) + .map { doc => + WriteMessage.createIndexMessage(docId, doc).withVersion(externalVersion) + } + .via( + ElasticsearchFlow.create[Book]( + constructElasticsearchParams(indexName, typeName, apiVersion), + baseWriteSettings.withBufferSize(5).withVersionType("external") + ) + ) + .runWith(Sink.seq) + + val insertResult = insertWrite.futureValue.head + assert(insertResult.success) + + flushAndRefresh(connectionSettings, indexName) + + // Assert that the document's external version is saved + val readFirst = ElasticsearchSource + .typed[Book]( + constructElasticsearchParams(indexName, typeName, apiVersion), + """{"match_all": {}}""", + settings = baseSourceSettings.withIncludeDocumentVersion(true) + ) + .runWith(Sink.head) + + assert(readFirst.futureValue.version.contains(externalVersion)) + } + } + + "ElasticsearchSource" should { + insertTestData(connectionSettings) + + "allow search without specifying typeName" in { + val readWithoutTypeName = ElasticsearchSource + .typed[Book]( + constructElasticsearchParams("source", "_doc", apiVersion), + query = """{"match_all": {}}""", + settings = baseSourceSettings.withBufferSize(5).withApiVersion(OpensearchApiVersion.V1) + ) + .map(_.source.title) + .runWith(Sink.seq) + + val result = readWithoutTypeName.futureValue.toList + result.sorted shouldEqual Seq( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production" + ) + } + + "allow search on index pattern with no matching index" in { + val readWithoutTypeName = ElasticsearchSource + .typed[Book]( + constructElasticsearchParams("missing-*", "_doc", apiVersion), + query = """{"match_all": {}}""", + settings = baseSourceSettings.withBufferSize(5) + ) + .map(_.source.title) + .runWith(Sink.seq) + + val result = readWithoutTypeName.futureValue.toList + result shouldEqual Seq() + } + + "sort by _doc by default" in { + val read = ElasticsearchSource + .typed[Book]( + constructElasticsearchParams("source", "_doc", apiVersion), + query = """{"match_all": {}}""", + settings = baseSourceSettings.withBufferSize(3).withApiVersion(apiVersion) + ) + .map(_.source.title) + .runWith(Sink.seq) + + val result = read.futureValue.toList + + // sort: _doc is by design an undefined order and is non-deterministic + // we cannot check a specific order of values + result should contain theSameElementsAs (List("Akka in Action", + "Programming in Scala", + "Learning Scala", + "Scala for Spark in Production", + "Scala Puzzlers", + "Effective Akka", + "Akka Concurrency")) + } + + "sort by user defined field" in { + val read = ElasticsearchSource + .typed[Book]( + constructElasticsearchParams("source", "_doc", apiVersion), + Map( + "query" -> """{"match_all": {}}""", + "sort" -> """["price"]""" + ), + settings = baseSourceSettings.withBufferSize(3).withApiVersion(apiVersion) + ) + .map(_.source.price) + .runWith(Sink.seq) + + val result = read.futureValue.toList + result shouldEqual result.sorted + + } + + "sort by user defined field desc" in { + val read = ElasticsearchSource + .typed[Book]( + constructElasticsearchParams("source", "_doc", apiVersion), + Map( + "query" -> """{"match_all": {}}""", + "sort" -> """[{"price": "desc"}]""" + ), + settings = baseSourceSettings.withBufferSize(3).withApiVersion(apiVersion) + ) + .map(_.source.price) + .runWith(Sink.seq) + + val result = read.futureValue.toList + result shouldEqual result.sorted.reverse + + } + + } + + lazy val _ = { + //#connection-settings + val connectionSettings = OpensearchConnectionSettings("http://localhost:9200") + .withCredentials("user", "password") + //#connection-settings + //#source-settings + val sourceSettings = OpensearchSourceSettings(connectionSettings) + .withBufferSize(10) + .withScrollDuration(5.minutes) + //#source-settings + sourceSettings.toString should startWith("OpensearchSourceSettings(") + //#sink-settings + val sinkSettings = + OpensearchWriteSettings(connectionSettings) + .withBufferSize(10) + .withVersionType("internal") + .withRetryLogic(RetryAtFixedRate(maxRetries = 5, retryInterval = 1.second)) + .withApiVersion(OpensearchApiVersion.V1) + //#sink-settings + sinkSettings.toString should startWith("OpensearchWriteSettings(") + //#opensearch-params + val OpensearchParamsV1 = OpensearchParams.V1("index") + //#opensearch-params + OpensearchParamsV1.toString should startWith("OpensearchParams(") + val doc = "dummy-doc" + //#custom-metadata-example + val msg = WriteMessage + .createIndexMessage(doc) + .withCustomMetadata(Map("pipeline" -> "myPipeline")) + //#custom-metadata-example + msg.customMetadata should contain("pipeline") + } + + } + +} diff --git a/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala new file mode 100644 index 0000000000..3f6130384c --- /dev/null +++ b/elasticsearch/src/test/scala/docs/scaladsl/OpensearchV1Spec.scala @@ -0,0 +1,556 @@ +/* + * Copyright (C) since 2016 Lightbend Inc. + */ + +package docs.scaladsl + +import akka.http.scaladsl.model.Uri.Path +import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri} +import akka.stream.alpakka.elasticsearch.{ + ElasticsearchConnectionSettings, + OpensearchApiVersion, + OpensearchConnectionSettings, + ReadResult, + StringMessageWriter, + WriteMessage, + WriteResult +} +import akka.stream.alpakka.elasticsearch.scaladsl.{ElasticsearchFlow, ElasticsearchSink, ElasticsearchSource} +import akka.stream.alpakka.elasticsearch._ +import akka.stream.scaladsl.{Sink, Source} +import akka.testkit.TestKit +import akka.{Done, NotUsed} +import spray.json.jsonReader + +import scala.collection.immutable +import scala.concurrent.Future +import spray.json._ + +class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils { + + private val connectionSettings: ElasticsearchConnectionSettings = OpensearchConnectionSettings( + "http://localhost:9203" + ) + private val baseSourceSettings = OpensearchSourceSettings(connectionSettings).withApiVersion(OpensearchApiVersion.V1) + private val baseWriteSettings = OpensearchWriteSettings(connectionSettings).withApiVersion(OpensearchApiVersion.V1) + + override protected def beforeAll() = { + insertTestData(connectionSettings) + } + + override def afterAll() = { + val deleteRequest = HttpRequest(HttpMethods.DELETE) + .withUri(Uri(connectionSettings.baseUrl).withPath(Path("/_all"))) + http.singleRequest(deleteRequest).futureValue + + TestKit.shutdownActorSystem(system) + } + + "Un-typed Opensearch connector" should { + "consume and publish Json documents" in { + val indexName = "sink2" + + //#run-jsobject + val copy = ElasticsearchSource + .create( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + query = """{"match_all": {}}""", + settings = baseSourceSettings + ) + .map { message: ReadResult[spray.json.JsObject] => + val book: Book = jsonReader[Book].read(message.source) + WriteMessage.createIndexMessage(message.id, book) + } + .runWith( + ElasticsearchSink.create[Book]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings + ) + ) + //#run-jsobject + + copy.futureValue shouldBe Done + flushAndRefresh(connectionSettings, indexName) + + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, indexName).futureValue should contain allElementsOf Seq( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production" + ) + } + } + + "Typed Opensearch connector" should { + "consume and publish documents as specific type" in { + val indexName = "sink2" + + //#run-typed + val copy = ElasticsearchSource + .typed[Book]( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + query = """{"match_all": {}}""", + settings = baseSourceSettings + ) + .map { message: ReadResult[Book] => + WriteMessage.createIndexMessage(message.id, message.source) + } + .runWith( + ElasticsearchSink.create[Book]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings + ) + ) + //#run-typed + + copy.futureValue shouldBe Done + flushAndRefresh(connectionSettings, indexName) + + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, indexName).futureValue should contain allElementsOf Seq( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production" + ) + } + } + + "ElasticsearchFlow" should { + "store documents and pass failed documents to downstream" in { + val indexName = "sink3" + //#run-flow + val copy = ElasticsearchSource + .typed[Book]( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + query = """{"match_all": {}}""", + settings = baseSourceSettings + ) + .map { message: ReadResult[Book] => + WriteMessage.createIndexMessage(message.id, message.source) + } + .via( + ElasticsearchFlow.create[Book]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings + ) + ) + .runWith(Sink.seq) + //#run-flow + + // Assert no errors + copy.futureValue.filter(!_.success) shouldBe empty + flushAndRefresh(connectionSettings, indexName) + + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, indexName).futureValue.sorted shouldEqual Seq( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production" + ) + } + + "store properly formatted JSON from Strings" in { + val indexName = "sink3-0" + + // #string + val write: Future[immutable.Seq[WriteResult[String, NotUsed]]] = Source( + immutable.Seq( + WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.toString()), + WriteMessage.createIndexMessage("2", Book("Faust").toJson.toString()), + WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.toString()) + ) + ).via( + ElasticsearchFlow.create( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings, + StringMessageWriter + ) + ) + .runWith(Sink.seq) + // #string + + // Assert no errors + write.futureValue.filter(!_.success) shouldBe empty + flushAndRefresh(connectionSettings, indexName) + + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, indexName).futureValue.sorted shouldEqual Seq( + "Das Parfum", + "Die unendliche Geschichte", + "Faust" + ) + } + + "kafka-example - store documents and pass Responses with passThrough" in { + + //#kafka-example + // We're going to pretend we got messages from kafka. + // After we've written them to Elastic, we want + // to commit the offset to Kafka + + case class KafkaOffset(offset: Int) + case class KafkaMessage(book: Book, offset: KafkaOffset) + + val messagesFromKafka = List( + KafkaMessage(Book("Book 1"), KafkaOffset(0)), + KafkaMessage(Book("Book 2"), KafkaOffset(1)), + KafkaMessage(Book("Book 3"), KafkaOffset(2)) + ) + + var committedOffsets = Vector[KafkaOffset]() + + def commitToKafka(offset: KafkaOffset): Unit = + committedOffsets = committedOffsets :+ offset + + val indexName = "sink6" + val kafkaToOs = Source(messagesFromKafka) // Assume we get this from Kafka + .map { kafkaMessage: KafkaMessage => + val book = kafkaMessage.book + val id = book.title + + // Transform message so that we can write to elastic + WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset) + } + .via( // write to elastic + ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings + ) + ) + .map { result => + if (!result.success) throw new Exception("Failed to write message to elastic") + // Commit to kafka + commitToKafka(result.message.passThrough) + } + .runWith(Sink.ignore) + + kafkaToOs.futureValue shouldBe Done + //#kafka-example + flushAndRefresh(connectionSettings, indexName) + + // Make sure all messages was committed to kafka + committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, indexName).futureValue.toList should contain allElementsOf messagesFromKafka + .map(_.book.title) + } + + "kafka-example - store documents and pass Responses with passThrough in bulk" in { + + // We're going to pretend we got messages from kafka. + // After we've written them to Elastic, we want + // to commit the offset to Kafka + + case class KafkaOffset(offset: Int) + case class KafkaMessage(book: Book, offset: KafkaOffset) + + val messagesFromKafka = List( + KafkaMessage(Book("Book 1"), KafkaOffset(0)), + KafkaMessage(Book("Book 2"), KafkaOffset(1)), + KafkaMessage(Book("Book 3"), KafkaOffset(2)) + ) + + var committedOffsets = Vector[KafkaOffset]() + + def commitToKafka(offset: KafkaOffset): Unit = + committedOffsets = committedOffsets :+ offset + + val indexName = "sink6-bulk" + val kafkaToOs = Source(messagesFromKafka) // Assume we get this from Kafka + .map { kafkaMessage: KafkaMessage => + val book = kafkaMessage.book + val id = book.title + + // Transform message so that we can write to elastic + WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset) + } + .grouped(2) + .via( // write to elastic + ElasticsearchFlow.createBulk[Book, KafkaOffset]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings + ) + ) + .map(_.map { result => + if (!result.success) throw new Exception("Failed to write message to elastic") + // Commit to kafka + commitToKafka(result.message.passThrough) + }) + .runWith(Sink.ignore) + + kafkaToOs.futureValue shouldBe Done + + flushAndRefresh(connectionSettings, indexName) + + // Make sure all messages was committed to kafka + committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, indexName).futureValue.toList should contain allElementsOf messagesFromKafka + .map(_.book.title) + } + + "kafka-example - store documents and pass Responses with passThrough skipping some w/ NOP" in { + + // We're going to pretend we got messages from kafka. + // After we've written them to Elastic, we want + // to commit the offset to Kafka + + case class KafkaOffset(offset: Int) + case class KafkaMessage(book: Book, offset: KafkaOffset) + + val messagesFromKafka = List( + KafkaMessage(Book("Book A", shouldSkip = Some(true)), KafkaOffset(0)), + KafkaMessage(Book("Book 1"), KafkaOffset(1)), + KafkaMessage(Book("Book 2"), KafkaOffset(2)), + KafkaMessage(Book("Book B", shouldSkip = Some(true)), KafkaOffset(3)), + KafkaMessage(Book("Book 3"), KafkaOffset(4)), + KafkaMessage(Book("Book C", shouldSkip = Some(true)), KafkaOffset(5)) + ) + + var committedOffsets = Vector[KafkaOffset]() + + def commitToKafka(offset: KafkaOffset): Unit = + committedOffsets = committedOffsets :+ offset + + val indexName = "sink6-nop" + val kafkaToOs = Source(messagesFromKafka) // Assume we get this from Kafka + .map { kafkaMessage: KafkaMessage => + val book = kafkaMessage.book + val id = book.title + + // Transform message so that we can write to elastic + if (book.shouldSkip.getOrElse(false)) + WriteMessage.createNopMessage[Book]().withPassThrough(kafkaMessage.offset) + else + WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset) + } + .via( // write to elastic + ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings + ) + ) + .map { result => + if (!result.success) throw new Exception("Failed to write message to elastic") + // Commit to kafka + commitToKafka(result.message.passThrough) + } + .runWith(Sink.ignore) + + kafkaToOs.futureValue shouldBe Done + + flushAndRefresh(connectionSettings, indexName) + + // Make sure all messages was committed to kafka + committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2, 3, 4, 5) + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, indexName).futureValue.toList should contain allElementsOf messagesFromKafka + .filterNot(_.book.shouldSkip.getOrElse(false)) + .map(_.book.title) + } + + "kafka-example - skip all NOP documents and pass Responses with passThrough" in { + + // We're going to pretend we got messages from kafka. + // After we've written them to Elastic, we want + // to commit the offset to Kafka + + case class KafkaOffset(offset: Int) + case class KafkaMessage(book: Book, offset: KafkaOffset) + + val messagesFromKafka = List( + KafkaMessage(Book("Book 1", shouldSkip = Some(true)), KafkaOffset(0)), + KafkaMessage(Book("Book 2", shouldSkip = Some(true)), KafkaOffset(1)), + KafkaMessage(Book("Book 3", shouldSkip = Some(true)), KafkaOffset(2)) + ) + + var committedOffsets = Vector[KafkaOffset]() + + def commitToKafka(offset: KafkaOffset): Unit = + committedOffsets = committedOffsets :+ offset + + val indexName = "sink6-none" + register(connectionSettings, indexName, "dummy", 10) // need to create index else exception in reading below + + val kafkaToOs = Source(messagesFromKafka) // Assume we get this from Kafka + .map { kafkaMessage: KafkaMessage => + val book = kafkaMessage.book + val id = book.title + + // Transform message so that we can write to elastic + if (book.shouldSkip.getOrElse(false)) + WriteMessage.createNopMessage[Book]().withPassThrough(kafkaMessage.offset) + else + WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset) + } + .via( // write to elastic + ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings + ) + ) + .map { result => + if (!result.success) throw new Exception("Failed to write message to elastic") + // Commit to kafka + commitToKafka(result.message.passThrough) + } + .runWith(Sink.ignore) + + kafkaToOs.futureValue shouldBe Done + + flushAndRefresh(connectionSettings, indexName) + + // Make sure all messages was committed to kafka + committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, indexName).futureValue.toList shouldBe List("dummy") + } + + "handle multiple types of operations correctly" in { + val indexName = "sink8" + //#multiple-operations + val requests = List[WriteMessage[Book, NotUsed]]( + WriteMessage.createIndexMessage(id = "00001", source = Book("Book 1")), + WriteMessage.createUpsertMessage(id = "00002", source = Book("Book 2")), + WriteMessage.createUpsertMessage(id = "00003", source = Book("Book 3")), + WriteMessage.createUpdateMessage(id = "00004", source = Book("Book 4")), + WriteMessage.createCreateMessage(id = "00005", source = Book("Book 5")), + WriteMessage.createDeleteMessage(id = "00002") + ) + + val writeResults = Source(requests) + .via( + ElasticsearchFlow.create[Book]( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + baseWriteSettings + ) + ) + .runWith(Sink.seq) + //#multiple-operations + + val results = writeResults.futureValue + results should have size requests.size + // Assert no errors except a missing document for a update request + val errorMessages = results.flatMap(_.errorReason) + errorMessages should have size 1 + errorMessages.head shouldEqual "[_doc][00004]: document missing" + flushAndRefresh(connectionSettings, indexName) + + // Assert docs in sink8/_doc + val readBooks = ElasticsearchSource( + constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1), + """{"match_all": {}}""", + baseSourceSettings + ).map { message => + message.source + } + .runWith(Sink.seq) + + // Docs should contain both columns + readBooks.futureValue.sortBy(_.fields("title").compactPrint) shouldEqual Seq( + Book("Book 1").toJson, + Book("Book 3").toJson, + Book("Book 5").toJson + ) + } + + "use indexName supplied in message if present" in { + // Copy source/_doc to sink2/_doc through typed stream + + //#custom-index-name-example + val customIndexName = "custom-index" + + val writeCustomIndex = ElasticsearchSource + .typed[Book]( + constructElasticsearchParams("source", "_doc", OpensearchApiVersion.V1), + query = """{"match_all": {}}""", + settings = baseSourceSettings + ) + .map { message: ReadResult[Book] => + WriteMessage + .createIndexMessage(message.id, message.source) + .withIndexName(customIndexName) // Setting the index-name to use for this document + } + .runWith( + ElasticsearchSink.create[Book]( + constructElasticsearchParams("this-is-not-the-index-we-are-using", "_doc", OpensearchApiVersion.V1), + settings = baseWriteSettings + ) + ) + //#custom-index-name-example + + writeCustomIndex.futureValue shouldBe Done + flushAndRefresh(connectionSettings, customIndexName) + readTitlesFrom(OpensearchApiVersion.V1, baseSourceSettings, customIndexName).futureValue.sorted shouldEqual Seq( + "Akka Concurrency", + "Akka in Action", + "Effective Akka", + "Learning Scala", + "Programming in Scala", + "Scala Puzzlers", + "Scala for Spark in Production" + ) + } + } + + "ElasticsearchSource" should { + "be able to use custom searchParams" in { + import spray.json._ + import DefaultJsonProtocol._ + + case class TestDoc(id: String, a: String, b: Option[String], c: String) + + implicit val formatVersionTestDoc: JsonFormat[TestDoc] = jsonFormat4(TestDoc) + + val indexName = "custom-search-params-test-scala" + val typeName = "_doc" + + val docs = List( + TestDoc("1", "a1", Some("b1"), "c1"), + TestDoc("2", "a2", Some("b2"), "c2"), + TestDoc("3", "a3", Some("b3"), "c3") + ) + + // insert new documents + val writes = Source(docs) + .map { doc => + WriteMessage.createIndexMessage(doc.id, doc) + } + .via( + ElasticsearchFlow.create[TestDoc]( + constructElasticsearchParams(indexName, typeName, OpensearchApiVersion.V1), + baseWriteSettings.withBufferSize(5) + ) + ) + .runWith(Sink.seq) + + writes.futureValue.filter(!_.success) shouldBe empty + flushAndRefresh(connectionSettings, indexName) + + //#custom-search-params + // Search for docs and ask elastic to only return some fields + + val readWithSearchParameters = ElasticsearchSource + .typed[TestDoc]( + constructElasticsearchParams(indexName, typeName, OpensearchApiVersion.V1), + searchParams = Map( + "query" -> """ {"match_all": {}} """, + "_source" -> """ ["id", "a", "c"] """ + ), + baseSourceSettings + ) + .map { message => + message.source + } + .runWith(Sink.seq) + //#custom-search-params + + assert(readWithSearchParameters.futureValue.toList.sortBy(_.id) == docs.map(_.copy(b = None))) + + } + } +}