New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ElasticSearch: Replace RestClient with Akka HTTP client #2347
ElasticSearch: Replace RestClient with Akka HTTP client #2347
Conversation
Thank you for giving this a try, it looks promising. Please observe that even this approach is not compatible to go into Alpakka 2.0 releases, but is of course a great improvement to be independent in the future.
|
Hi @ennru Thanks for the quick feedback. I'm aware of the fact that it won't be available in Alpakka 2 but it's the best long term solution there is so definitely worth it. I'll take your comments into account and continue working on the PR 👍 |
@ennru I finally found some time to wrap this up. The biggest issue I ran into was the Since ES is deprecating type mappings, and since the FlowStage was already using Let me know what you think about this approach |
Hi @ennru Would you be able to take a look at the progress I made? It would be a shame if this would miss the major release planned for the end of the year. Thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very good now. Actually simpler than with the REST client.
- As this breaks the API, are there other changes that are good to do now?
- For Alpakka 3.0 we should switch the API version default to V7, WDYT? (in a separate PR)
...earch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchConnectionSettings.scala
Outdated
Show resolved
Hide resolved
...earch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchConnectionSettings.scala
Outdated
Show resolved
Hide resolved
...ticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSource.scala
Outdated
Show resolved
Hide resolved
elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchIndexType.scala
Outdated
Show resolved
Hide resolved
I've updated the PR to reflect the requested changes (except for the last outstanding one). After wrapping this up, I can definitely do another PR to set the default to ES 7. |
@ennru the PR has been updated as discussed. I've set up There's some tests failing in a different subproject but that doesn't seem to be related to my changes. |
if (apiVersion == ApiVersion.V5) { | ||
require(value != null && value.trim.nonEmpty, "You must define a type name for ElasticSearch API version V5") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make this check useful an API version should always be selected prior to calling the other setters.
I'd remove the default variants (an index name must always be given) and just keep the V5
and V7
constructors which can use new
directly.
That's more invasive in our tests, but normal users will not switch between different versions in their code.
Or do you see a way that we don't need to have the API version on both EsParams
and the source settings?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed EsParams
to only have the 2 creation methods V5
and V7
. The only place where EsParams.apiVersion
was used was in the ElasticsearchSourceStage
and since that also has access to the ElasticsearchSourceSettings
I've decided to remove EsParams.apiVersion
all together.
Since both Java and Scala tests run for both V5 and V7 I did have to add a helper function in the tests to construct the right EsParams
in the tests. The biggest issue with this setup is that all the documentation examples (run-typed
, run-jsobject
, run-flow
, string
, kafka-example
, custom-index-name-example
and custom-search-params
are taken from tests that now use the helper method to enable having a single test that runs against both API versions.
I'd prefer to have all examples documented with only 1 of the 2 EsParam
versions (V7 since that'll be the default for Alpakka 3?). Do you see a better alternative to setting this up besides duplicating all these tests so that they work with the explicit EsParams.V5
and EsParams.V7
?
I did already add some generic documentation on EsParams
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a few more details.
We'll soon open for Alpakka 3 merges!
The test code showing in the docs: Could we just duplicate the ones that show in the docs? Just showing V7 is enough. As this breaks the API substantially, I wonder if V7 should become the default.
...earch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchConnectionSettings.scala
Outdated
Show resolved
Hide resolved
elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/EsParams.scala
Outdated
Show resolved
Hide resolved
...rch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStage.scala
Outdated
Show resolved
Hide resolved
elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSink.scala
Outdated
Show resolved
Hide resolved
...earch/src/main/scala/akka/stream/alpakka/elasticsearch/ElasticsearchConnectionSettings.scala
Outdated
Show resolved
Hide resolved
I've updated it once again. All comments have been resolved and I've split up the tests that were used in the documentation between V5 and V7. I've tried to reduce duplication where possible. V5 tests are being used in the docs. |
I've rebased and fixed formatting. |
Hi @ennru, thank you. I've updated the PR and fixed the test. Any test was running flaky due to the 600ms default future timeout. I've added IntegrationPatience to prevent this. Let me know if this is good to go. I wanted to suggest to align naming of |
I agree, making it short is the wrong optimisation. |
I've renamed all instances of EsParams. Only the mqtt tests seem to be failing but that seems unrelated. Let me know if there's anything else to pick up! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thank you! Great to become independent of the Elasticsearch client library. If you're up for more, #2472 would be great. |
Purpose
Perform all HTTP calls to ElasticSearch using an Akka HTTP client instead of using the low level elasticsearch-rest-client.
References
#2308
Changes
Context
As proposed by @ennru, this PR replaces the elasticsearch-rest-client with an Akka HTTP implementation. The
ElasticsearchSimpleFlowStage
andElasticsearchSourceStage
have been updated but I'd like to get some early feedback on the way I've tackled it.Some outstanding thoughts:
ElasticsearchSourceSettings
andElasticsearchWriteSettings
. I'm not too happy about that. What about having 1ElasticsearchSettings
instance that looks like this:baseUrl
property which includes the scheme, host and port for ES or should we allow configuring them separately?Source.setup
andFlow.setup
to gain access to the materializer, actor system and execution context. Is this the best way to do so or is there an alternative?ContentType
forapplication/x-ndjson
in akka-http which should be used for the bulk APII'll hold on with updating all the tests until I've got some feedback to prevent rework.
Looking forward to hearing what you think @ennru