Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-5107] Support ES 6.x for ElasticsearchIO #6210

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -141,11 +141,17 @@ public void testSizesVolume() throws Exception {
*/
@Test
public void testWriteWithFullAddressingVolume() throws Exception {
// cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
int backendVersion = ElasticsearchIO.getBackendVersion(writeConnectionConfiguration);
if (backendVersion < 6) {
// This test uses multi types but Elasticsearch 6.x+ does not support multi types
// https://www.elastic.co/guide/en/elasticsearch/reference/6.x/breaking-changes-6.0.html
// so this test does not work
// cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite =
new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
}
}

/**
Expand Down
Expand Up @@ -201,8 +201,14 @@ public void testWriteWithIndexFn() throws Exception {

@Test
public void testWriteWithTypeFn() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithTypeFn();
int backendVersion = ElasticsearchIO.getBackendVersion(connectionConfiguration);
if (backendVersion < 6) {
// Elasticsearch 6.x+ does not support multi types
// https://www.elastic.co/guide/en/elasticsearch/reference/6.x/breaking-changes-6.0.html
// so this test does not work
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithTypeFn();
}
}

@Test
Expand Down
Expand Up @@ -133,8 +133,7 @@ void testReadWithQuery() throws Exception {
+ " \"query\": {\n"
+ " \"match\" : {\n"
+ " \"scientist\" : {\n"
+ " \"query\" : \"Einstein\",\n"
+ " \"type\" : \"boolean\"\n"
+ " \"query\" : \"Einstein\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
Expand Down
Expand Up @@ -174,7 +174,7 @@ static void checkForErrors(Response response, int backendVersion) throws IOExcep
String errorRootName = "";
if (backendVersion == 2) {
errorRootName = "create";
} else if (backendVersion == 5) {
} else if (backendVersion >= 5) {
errorRootName = "index";
}
JsonNode errorRoot = item.path(errorRootName);
Expand Down Expand Up @@ -498,9 +498,9 @@ public List<? extends BoundedSource<String>> split(
List<BoundedElasticsearchSource> sources = new ArrayList<>();
if (backendVersion == 2) {
// 1. We split per shard :
// unfortunately, Elasticsearch 2. x doesn 't provide a way to do parallel reads on a single
// unfortunately, Elasticsearch 2.x doesn't provide a way to do parallel reads on a single
// shard.So we do not use desiredBundleSize because we cannot split shards.
// With the slice API in ES 5.0 we will be able to use desiredBundleSize.
// With the slice API in ES 5.x+ we will be able to use desiredBundleSize.
// Basically we will just ask the slice API to return data
// in nbBundles = estimatedSize / desiredBundleSize chuncks.
// So each beam source will read around desiredBundleSize volume of data.
Expand All @@ -516,11 +516,11 @@ public List<? extends BoundedSource<String>> split(
sources.add(new BoundedElasticsearchSource(spec, shardId, null, null, backendVersion));
}
checkArgument(!sources.isEmpty(), "No shard found");
} else if (backendVersion == 5) {
} else if (backendVersion >= 5) {
long indexSize = BoundedElasticsearchSource.estimateIndexSize(connectionConfiguration);
float nbBundlesFloat = (float) indexSize / desiredBundleSizeBytes;
int nbBundles = (int) Math.ceil(nbBundlesFloat);
//ES slice api imposes that the number of slices is <= 1024 even if it can be overloaded
// ES slice api imposes that the number of slices is <= 1024 even if it can be overloaded
if (nbBundles > 1024) {
nbBundles = 1024;
}
Expand Down Expand Up @@ -549,7 +549,7 @@ static long estimateIndexSize(ConnectionConfiguration connectionConfiguration)
// as Elasticsearch 2.x doesn't not support any way to do parallel read inside a shard
// the estimated size bytes is not really used in the split into bundles.
// However, we implement this method anyway as the runners can use it.
// NB: Elasticsearch 5.x now provides the slice API.
// NB: Elasticsearch 5.x+ now provides the slice API.
// (https://www.elastic.co/guide/en/elasticsearch/reference/5.0/search-request-scroll.html
// #sliced-scroll)
JsonNode statsJson = getStats(connectionConfiguration, false);
Expand Down Expand Up @@ -616,7 +616,7 @@ public boolean start() throws IOException {
if (query == null) {
query = "{\"query\": { \"match_all\": {} }}";
}
if (source.backendVersion == 5 && source.numSlices != null && source.numSlices > 1) {
if (source.backendVersion >= 5 && source.numSlices != null && source.numSlices > 1) {
//if there is more than one slice, add the slice to the user query
String sliceQuery =
String.format("\"slice\": {\"id\": %s,\"max\": %s}", source.sliceId, source.numSlices);
Expand Down Expand Up @@ -1026,10 +1026,10 @@ static int getBackendVersion(ConnectionConfiguration connectionConfiguration) {
int backendVersion =
Integer.parseInt(jsonNode.path("version").path("number").asText().substring(0, 1));
checkArgument(
(backendVersion == 2 || backendVersion == 5),
(backendVersion == 2 || backendVersion >= 5),
"The Elasticsearch version to connect to is %s.x. "
+ "This version of the ElasticsearchIO is only compatible with "
+ "Elasticsearch v5.x and v2.x",
+ "Elasticsearch v5.x+ and v2.x",
backendVersion);
return backendVersion;

Expand Down