From 6195cf46f5bd5d593f3598c533af9ed3694567e6 Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Wed, 5 Jul 2017 11:35:59 +0200 Subject: [PATCH 1/4] [BEAM-2541] Check Elasticsearch backend version when the pipeline is run not when it is constructed --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 44 +++++++++++-------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 4d7688772a00..3bba27515f2e 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -116,6 +116,18 @@ @Experimental(Experimental.Kind.SOURCE_SINK) public class ElasticsearchIO { + private static void checkVersion(ConnectionConfiguration connectionConfiguration) + throws IOException { + RestClient restClient = connectionConfiguration.createClient(); + Response response = restClient.performRequest("GET", "", new BasicHeader("", "")); + JsonNode jsonNode = parseResponse(response); + String version = jsonNode.path("version").path("number").asText(); + boolean version2x = version.startsWith("2."); + restClient.close(); + checkArgument(version2x, "The Elasticsearch version to connect to is different of 2.x. " + + "This version of the ElasticsearchIO is only compatible with Elasticsearch v2.x"); + } + public static Read read() { // default scrollKeepalive = 5m as a majorant for un-predictable time between 2 start/read calls // default batchSize to 100 as recommended by ES dev team as a safe value when dealing @@ -206,25 +218,9 @@ public static ConnectionConfiguration create(String[] addresses, String index, S .setIndex(index) .setType(type) .build(); - checkVersion(connectionConfiguration); return connectionConfiguration; } - private static void checkVersion(ConnectionConfiguration connectionConfiguration) - throws IOException { - RestClient restClient = connectionConfiguration.createClient(); - Response response = restClient.performRequest("GET", "", new BasicHeader("", "")); - JsonNode jsonNode = parseResponse(response); - String version = jsonNode.path("version").path("number").asText(); - boolean version2x = version.startsWith("2."); - restClient.close(); - checkArgument( - version2x, - "ConnectionConfiguration.create(addresses, index, type): " - + "the Elasticsearch version to connect to is different of 2.x. " - + "This version of the ElasticsearchIO is only compatible with Elasticsearch v2.x"); - } - /** * If Elasticsearch authentication is enabled, provide the username. * @@ -398,10 +394,16 @@ public PCollection expand(PBegin input) { @Override public void validate(PipelineOptions options) { + ConnectionConfiguration connectionConfiguration = getConnectionConfiguration(); checkState( - getConnectionConfiguration() != null, + connectionConfiguration != null, "ElasticsearchIO.read() requires a connection configuration" + " to be set via withConnectionConfiguration(configuration)"); + try { + checkVersion(connectionConfiguration); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } } @Override @@ -715,10 +717,16 @@ public Write withMaxBatchSizeBytes(long batchSizeBytes) { @Override public void validate(PipelineOptions options) { + ConnectionConfiguration connectionConfiguration = getConnectionConfiguration(); checkState( - getConnectionConfiguration() != null, + connectionConfiguration != null, "ElasticsearchIO.write() requires a connection configuration" + " to be set via withConnectionConfiguration(configuration)"); + try { + checkVersion(connectionConfiguration); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } } @Override From 06181195aac15a3df4ee2ad3248d215256fe8b53 Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Wed, 19 Jul 2017 15:26:57 +0200 Subject: [PATCH 2/4] Move checkVersion to the bottom of ElasticsearchIO class --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 3bba27515f2e..a3f9621c7fb5 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -116,18 +116,6 @@ @Experimental(Experimental.Kind.SOURCE_SINK) public class ElasticsearchIO { - private static void checkVersion(ConnectionConfiguration connectionConfiguration) - throws IOException { - RestClient restClient = connectionConfiguration.createClient(); - Response response = restClient.performRequest("GET", "", new BasicHeader("", "")); - JsonNode jsonNode = parseResponse(response); - String version = jsonNode.path("version").path("number").asText(); - boolean version2x = version.startsWith("2."); - restClient.close(); - checkArgument(version2x, "The Elasticsearch version to connect to is different of 2.x. " - + "This version of the ElasticsearchIO is only compatible with Elasticsearch v2.x"); - } - public static Read read() { // default scrollKeepalive = 5m as a majorant for un-predictable time between 2 start/read calls // default batchSize to 100 as recommended by ES dev team as a safe value when dealing @@ -836,4 +824,15 @@ public void closeClient() throws Exception { } } } + private static void checkVersion(ConnectionConfiguration connectionConfiguration) + throws IOException { + RestClient restClient = connectionConfiguration.createClient(); + Response response = restClient.performRequest("GET", "", new BasicHeader("", "")); + JsonNode jsonNode = parseResponse(response); + String version = jsonNode.path("version").path("number").asText(); + boolean version2x = version.startsWith("2."); + restClient.close(); + checkArgument(version2x, "The Elasticsearch version to connect to is different of 2.x. " + + "This version of the ElasticsearchIO is only compatible with Elasticsearch v2.x"); + } } From fa5e25cfe49dd5fb3794813b41168bf4ae273a5f Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Thu, 20 Jul 2017 11:03:59 +0200 Subject: [PATCH 3/4] Catch IOException in checkVersion not outside --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index a3f9621c7fb5..ab90b6cf81e0 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -387,11 +387,7 @@ public void validate(PipelineOptions options) { connectionConfiguration != null, "ElasticsearchIO.read() requires a connection configuration" + " to be set via withConnectionConfiguration(configuration)"); - try { - checkVersion(connectionConfiguration); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } + checkVersion(connectionConfiguration); } @Override @@ -710,11 +706,7 @@ public void validate(PipelineOptions options) { connectionConfiguration != null, "ElasticsearchIO.write() requires a connection configuration" + " to be set via withConnectionConfiguration(configuration)"); - try { - checkVersion(connectionConfiguration); - } catch (IOException e) { - throw new IllegalArgumentException(e); - } + checkVersion(connectionConfiguration); } @Override @@ -824,15 +816,16 @@ public void closeClient() throws Exception { } } } - private static void checkVersion(ConnectionConfiguration connectionConfiguration) - throws IOException { - RestClient restClient = connectionConfiguration.createClient(); - Response response = restClient.performRequest("GET", "", new BasicHeader("", "")); - JsonNode jsonNode = parseResponse(response); - String version = jsonNode.path("version").path("number").asText(); - boolean version2x = version.startsWith("2."); - restClient.close(); - checkArgument(version2x, "The Elasticsearch version to connect to is different of 2.x. " - + "This version of the ElasticsearchIO is only compatible with Elasticsearch v2.x"); + private static void checkVersion(ConnectionConfiguration connectionConfiguration){ + try (RestClient restClient = connectionConfiguration.createClient()) { + Response response = restClient.performRequest("GET", "", new BasicHeader("", "")); + JsonNode jsonNode = parseResponse(response); + String version = jsonNode.path("version").path("number").asText(); + boolean version2x = version.startsWith("2."); + checkArgument(version2x, "The Elasticsearch version to connect to is different of 2.x. " + + "This version of the ElasticsearchIO is only compatible with Elasticsearch v2.x"); + } catch (IOException e) { + throw new IllegalArgumentException("Cannot check Elasticsearch version"); + } } } From 5ca988adc61d54e2164e2b3c509719445c61d8be Mon Sep 17 00:00:00 2001 From: Etienne Chauchot Date: Wed, 26 Jul 2017 12:27:42 +0200 Subject: [PATCH 4/4] Cleaning IOException is never thrown in ConnectionConfiguration.create() final field try with resources --- .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 4 +--- .../io/elasticsearch/ElasticsearchTestDataSet.java | 11 ++++------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index ab90b6cf81e0..8e6e25371c63 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -183,10 +183,8 @@ abstract static class Builder { * @param index the index toward which the requests will be issued * @param type the document type toward which the requests will be issued * @return the connection configuration object - * @throws IOException when it fails to connect to Elasticsearch */ - public static ConnectionConfiguration create(String[] addresses, String index, String type) - throws IOException { + public static ConnectionConfiguration create(String[] addresses, String index, String type){ checkArgument( addresses != null, "ConnectionConfiguration.create(addresses, index, type) called with null address"); diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java index e2c291bb6852..a6e1cc09a855 100644 --- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.elasticsearch; -import java.io.IOException; import org.apache.beam.sdk.io.common.IOTestPipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.elasticsearch.client.RestClient; @@ -37,7 +36,7 @@ public class ElasticsearchTestDataSet { public static final long NUM_DOCS = 60000; public static final int AVERAGE_DOC_SIZE = 25; public static final int MAX_DOC_SIZE = 35; - private static String writeIndex = ES_INDEX + System.currentTimeMillis(); + private static final String writeIndex = ES_INDEX + System.currentTimeMillis(); /** * Use this to create the index for reading before IT read tests. @@ -63,17 +62,15 @@ public static void main(String[] args) throws Exception { } private static void createAndPopulateReadIndex(IOTestPipelineOptions options) throws Exception { - RestClient restClient = getConnectionConfiguration(options, ReadOrWrite.READ).createClient(); // automatically creates the index and insert docs - try { + try (RestClient restClient = getConnectionConfiguration(options, ReadOrWrite.READ) + .createClient()) { ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, restClient); - } finally { - restClient.close(); } } static ElasticsearchIO.ConnectionConfiguration getConnectionConfiguration( - IOTestPipelineOptions options, ReadOrWrite rOw) throws IOException { + IOTestPipelineOptions options, ReadOrWrite rOw){ ElasticsearchIO.ConnectionConfiguration connectionConfiguration = ElasticsearchIO.ConnectionConfiguration.create( new String[] {