From f48bb4be1d3bb23f3cc978c4c25cf43842639296 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Mon, 24 Jul 2017 17:53:15 +0200 Subject: [PATCH 1/2] [BEAM-1274] Add SSL mutual authentication support --- .../sdk/io/elasticsearch/ElasticsearchIO.java | 69 ++++++++++++++++++- 1 file changed, 67 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 50468887120b..2cd3bcd0baed 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -25,10 +25,14 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; + +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.Serializable; -import java.net.MalformedURLException; import java.net.URL; +import java.security.KeyStore; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -39,6 +43,8 @@ import java.util.Map; import java.util.NoSuchElementException; import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; + import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -60,7 +66,9 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.message.BasicHeader; +import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.ssl.SSLContexts; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; @@ -155,6 +163,12 @@ public abstract static class ConnectionConfiguration implements Serializable { @Nullable abstract String getPassword(); + @Nullable + abstract String getKeystorePath(); + + @Nullable + abstract String getKeystorePassword(); + abstract String getIndex(); abstract String getType(); @@ -169,6 +183,10 @@ abstract static class Builder { abstract Builder setPassword(String password); + abstract Builder setKeystorePath(String keystorePath); + + abstract Builder setKeystorePassword(String password); + abstract Builder setIndex(String index); abstract Builder setType(String type); @@ -239,6 +257,32 @@ public ConnectionConfiguration withPassword(String password) { return builder().setPassword(password).build(); } + /** + * If Elasticsearch uses SSL with mutual authentication (via shield), + * provide the keystore containing the client key. + * + * @param keystorePath the location of the keystore containing the client key. + * @return the {@link ConnectionConfiguration} object with keystore path set. + */ + public ConnectionConfiguration withKeystorePath(String keystorePath) { + checkArgument(keystorePath != null, "ConnectionConfiguration.create()" + + ".withKeystorePath(keystorePath) called with null keystorePath"); + return builder().setKeystorePath(keystorePath).build(); + } + + /** + * If Elasticsearch uses SSL with mutual authentication (via shield), + * provide the password to open the client keystore. + * + * @param keystorePassword the password of the client keystore. + * @return the {@link ConnectionConfiguration} object with keystore password set. + */ + public ConnectionConfiguration withKeystorePassword(String keystorePassword) { + checkArgument(keystorePassword != null, "ConnectionConfiguration.create()" + + ".withKeystorePassword(keystorePassword) called with null keystorePassword"); + return builder().setKeystorePassword(keystorePassword).build(); + } + private void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("address", getAddresses().toString())); builder.add(DisplayData.item("index", getIndex())); @@ -246,7 +290,7 @@ private void populateDisplayData(DisplayData.Builder builder) { builder.addIfNotNull(DisplayData.item("username", getUsername())); } - RestClient createClient() throws MalformedURLException { + RestClient createClient() throws IOException { HttpHost[] hosts = new HttpHost[getAddresses().size()]; int i = 0; for (String address : getAddresses()) { @@ -267,6 +311,27 @@ public HttpAsyncClientBuilder customizeHttpClient( } }); } + if (getKeystorePath() != null) { + try { + KeyStore keyStore = KeyStore.getInstance("jks"); + try (InputStream is = new FileInputStream(new File(getKeystorePath()))) { + keyStore.load(is, getKeystorePassword().toCharArray()); + } + final SSLContext sslContext = SSLContexts.custom() + .loadTrustMaterial(keyStore, null).build(); + final SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext); + restClientBuilder.setHttpClientConfigCallback( + new RestClientBuilder.HttpClientConfigCallback() { + @Override + public HttpAsyncClientBuilder customizeHttpClient( + HttpAsyncClientBuilder httpClientBuilder) { + return httpClientBuilder.setSSLContext(sslContext).setSSLStrategy(sessionStrategy); + } + }); + } catch (Exception e) { + throw new IOException("Can't load the client certificate from the keystore", e); + } + } return restClientBuilder.build(); } } From 02f11d3db98f33475ff1152d33e36161d59fd400 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Mon, 7 Aug 2017 07:49:36 +0200 Subject: [PATCH 2/2] =?UTF-8?q?[BEAM-1274]=C2=A0Add=20SSL/TLS=20in=20the?= =?UTF-8?q?=20comments,=20add=20the=20self=20signed=20policy=20support=20f?= =?UTF-8?q?or=20the=20SSL=20context?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 2cd3bcd0baed..e6a6a9ff4d9b 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -62,6 +62,7 @@ import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.entity.ContentType; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; @@ -258,7 +259,7 @@ public ConnectionConfiguration withPassword(String password) { } /** - * If Elasticsearch uses SSL with mutual authentication (via shield), + * If Elasticsearch uses SSL/TLS with mutual authentication (via shield), * provide the keystore containing the client key. * * @param keystorePath the location of the keystore containing the client key. @@ -267,15 +268,17 @@ public ConnectionConfiguration withPassword(String password) { public ConnectionConfiguration withKeystorePath(String keystorePath) { checkArgument(keystorePath != null, "ConnectionConfiguration.create()" + ".withKeystorePath(keystorePath) called with null keystorePath"); + checkArgument(!keystorePath.isEmpty(), "ConnectionConfiguration.create()" + + ".withKeystorePath(keystorePath) called with empty keystorePath"); return builder().setKeystorePath(keystorePath).build(); } /** - * If Elasticsearch uses SSL with mutual authentication (via shield), + * If Elasticsearch uses SSL/TLS with mutual authentication (via shield), * provide the password to open the client keystore. * * @param keystorePassword the password of the client keystore. - * @return the {@link ConnectionConfiguration} object with keystore password set. + * @return the {@link ConnectionConfiguration} object with keystore passwo:rd set. */ public ConnectionConfiguration withKeystorePassword(String keystorePassword) { checkArgument(keystorePassword != null, "ConnectionConfiguration.create()" @@ -288,6 +291,7 @@ private void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("index", getIndex())); builder.add(DisplayData.item("type", getType())); builder.addIfNotNull(DisplayData.item("username", getUsername())); + builder.addIfNotNull(DisplayData.item("keystore.path", getKeystorePath())); } RestClient createClient() throws IOException { @@ -311,14 +315,14 @@ public HttpAsyncClientBuilder customizeHttpClient( } }); } - if (getKeystorePath() != null) { + if (getKeystorePath() != null && !getKeystorePath().isEmpty()) { try { KeyStore keyStore = KeyStore.getInstance("jks"); try (InputStream is = new FileInputStream(new File(getKeystorePath()))) { keyStore.load(is, getKeystorePassword().toCharArray()); } final SSLContext sslContext = SSLContexts.custom() - .loadTrustMaterial(keyStore, null).build(); + .loadTrustMaterial(keyStore, new TrustSelfSignedStrategy()).build(); final SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext); restClientBuilder.setHttpClientConfigCallback( new RestClientBuilder.HttpClientConfigCallback() {