diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java index b38b1696141e3..2a7cf9c5d48d3 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java @@ -141,11 +141,17 @@ public void testSizesVolume() throws Exception { */ @Test public void testWriteWithFullAddressingVolume() throws Exception { - // cannot share elasticsearchIOTestCommon because tests run in parallel. - ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = - new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true); - elasticsearchIOTestCommonWrite.setPipeline(pipeline); - elasticsearchIOTestCommonWrite.testWriteWithFullAddressing(); + int backendVersion = ElasticsearchIO.getBackendVersion(writeConnectionConfiguration); + if (backendVersion < 6) { + // This test uses multi types but Elasticsearch 6.x+ does not support multi types + // https://www.elastic.co/guide/en/elasticsearch/reference/6.x/breaking-changes-6.0.html + // so this test does not work + // cannot share elasticsearchIOTestCommon because tests run in parallel. + ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = + new ElasticsearchIOTestCommon(writeConnectionConfiguration, restClient, true); + elasticsearchIOTestCommonWrite.setPipeline(pipeline); + elasticsearchIOTestCommonWrite.testWriteWithFullAddressing(); + } } /** diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 77e9c48008b65..13dc343e06ad8 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -201,8 +201,14 @@ public void testWriteWithIndexFn() throws Exception { @Test public void testWriteWithTypeFn() throws Exception { - elasticsearchIOTestCommon.setPipeline(pipeline); - elasticsearchIOTestCommon.testWriteWithTypeFn(); + int backendVersion = ElasticsearchIO.getBackendVersion(connectionConfiguration); + if (backendVersion < 6) { + // Elasticsearch 6.x+ does not support multi types + // https://www.elastic.co/guide/en/elasticsearch/reference/6.x/breaking-changes-6.0.html + // so this test does not work + elasticsearchIOTestCommon.setPipeline(pipeline); + elasticsearchIOTestCommon.testWriteWithTypeFn(); + } } @Test diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java index 329c68e00244d..70af1baef283f 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java @@ -133,8 +133,7 @@ void testReadWithQuery() throws Exception { + " \"query\": {\n" + " \"match\" : {\n" + " \"scientist\" : {\n" - + " \"query\" : \"Einstein\",\n" - + " \"type\" : \"boolean\"\n" + + " \"query\" : \"Einstein\"\n" + " }\n" + " }\n" + " }\n" diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 9f50dc199ebe8..2158c8fcb7ac8 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -174,7 +174,7 @@ static void checkForErrors(Response response, int backendVersion) throws IOExcep String errorRootName = ""; if (backendVersion == 2) { errorRootName = "create"; - } else if (backendVersion == 5) { + } else if (backendVersion >= 5) { errorRootName = "index"; } JsonNode errorRoot = item.path(errorRootName); @@ -498,9 +498,9 @@ public List> split( List sources = new ArrayList<>(); if (backendVersion == 2) { // 1. We split per shard : - // unfortunately, Elasticsearch 2. x doesn 't provide a way to do parallel reads on a single + // unfortunately, Elasticsearch 2.x doesn't provide a way to do parallel reads on a single // shard.So we do not use desiredBundleSize because we cannot split shards. - // With the slice API in ES 5.0 we will be able to use desiredBundleSize. + // With the slice API in ES 5.x+ we will be able to use desiredBundleSize. // Basically we will just ask the slice API to return data // in nbBundles = estimatedSize / desiredBundleSize chuncks. // So each beam source will read around desiredBundleSize volume of data. @@ -516,11 +516,11 @@ public List> split( sources.add(new BoundedElasticsearchSource(spec, shardId, null, null, backendVersion)); } checkArgument(!sources.isEmpty(), "No shard found"); - } else if (backendVersion == 5) { + } else if (backendVersion >= 5) { long indexSize = BoundedElasticsearchSource.estimateIndexSize(connectionConfiguration); float nbBundlesFloat = (float) indexSize / desiredBundleSizeBytes; int nbBundles = (int) Math.ceil(nbBundlesFloat); - //ES slice api imposes that the number of slices is <= 1024 even if it can be overloaded + // ES slice api imposes that the number of slices is <= 1024 even if it can be overloaded if (nbBundles > 1024) { nbBundles = 1024; } @@ -549,7 +549,7 @@ static long estimateIndexSize(ConnectionConfiguration connectionConfiguration) // as Elasticsearch 2.x doesn't not support any way to do parallel read inside a shard // the estimated size bytes is not really used in the split into bundles. // However, we implement this method anyway as the runners can use it. - // NB: Elasticsearch 5.x now provides the slice API. + // NB: Elasticsearch 5.x+ now provides the slice API. // (https://www.elastic.co/guide/en/elasticsearch/reference/5.0/search-request-scroll.html // #sliced-scroll) JsonNode statsJson = getStats(connectionConfiguration, false); @@ -616,7 +616,7 @@ public boolean start() throws IOException { if (query == null) { query = "{\"query\": { \"match_all\": {} }}"; } - if (source.backendVersion == 5 && source.numSlices != null && source.numSlices > 1) { + if (source.backendVersion >= 5 && source.numSlices != null && source.numSlices > 1) { //if there is more than one slice, add the slice to the user query String sliceQuery = String.format("\"slice\": {\"id\": %s,\"max\": %s}", source.sliceId, source.numSlices); @@ -1026,10 +1026,10 @@ static int getBackendVersion(ConnectionConfiguration connectionConfiguration) { int backendVersion = Integer.parseInt(jsonNode.path("version").path("number").asText().substring(0, 1)); checkArgument( - (backendVersion == 2 || backendVersion == 5), + (backendVersion == 2 || backendVersion >= 5), "The Elasticsearch version to connect to is %s.x. " + "This version of the ElasticsearchIO is only compatible with " - + "Elasticsearch v5.x and v2.x", + + "Elasticsearch v5.x+ and v2.x", backendVersion); return backendVersion;