From 93512d9f7739ddc518ea77b262d65220a3573588 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Tue, 7 Apr 2020 14:10:51 -0400 Subject: [PATCH] 0004338: Use HTTP/2 for encrypted HTTPS synchronization --- symmetric-assemble/common.gradle | 4 +- .../symmetric/AbstractCommandLauncher.java | 5 +- symmetric-core/build.gradle | 3 + .../symmetric/common/ServerConstants.java | 2 + .../service/impl/AbstractService.java | 7 +- .../service/impl/BandwidthService.java | 35 +- .../service/impl/RegistrationService.java | 4 +- .../transport/TransportManagerFactory.java | 36 +- .../transport/file/FileTransportManager.java | 4 +- .../transport/http/ConscryptHelper.java | 44 ++ .../transport/http/Http2Connection.java | 448 ++++++++++++++++++ .../transport/http/HttpConnection.java | 128 +++++ .../transport/http/HttpIncomingTransport.java | 13 +- .../transport/http/HttpOutgoingTransport.java | 37 +- .../transport/http/HttpTransportManager.java | 75 +-- .../internal/InternalTransportManager.java | 5 +- .../symmetric/util/SymmetricUtils.java | 3 + .../transport/MockTransportManager.java | 5 +- symmetric-server/build.gradle | 5 + .../deploy/conf/symmetric-server.properties | 4 + .../symmetric/SymmetricWebServer.java | 73 ++- .../symmetric/web/PushUriHandler.java | 3 +- .../main/java/org/jumpmind/util/AppUtils.java | 27 +- 23 files changed, 828 insertions(+), 142 deletions(-) create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/ConscryptHelper.java create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/Http2Connection.java create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpConnection.java diff --git a/symmetric-assemble/common.gradle b/symmetric-assemble/common.gradle index 3645dd2eef..4ca268cb5e 100644 --- a/symmetric-assemble/common.gradle +++ b/symmetric-assemble/common.gradle @@ -254,7 +254,9 @@ subprojects { subproject -> bouncyCastleVersion = '1.64' animalSnifferVersion = '1.18' jnaVersion = '5.5.0' - jettyVersion = '9.4.26.v20200117' + jettyVersion = '9.4.27.v20200227' + alpnBootVersion = '8.1.13.v20181017' + alpnApiVersion = '1.1.3.v20160715' websocketVersion = '1.1' env = System.getenv() } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/AbstractCommandLauncher.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/AbstractCommandLauncher.java index 56f5377258..9b9cd4dd7c 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/AbstractCommandLauncher.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/AbstractCommandLauncher.java @@ -129,8 +129,9 @@ public AbstractCommandLauncher(String app, String argSyntax, String messageKeyPr this.messageKeyPrefix = messageKeyPrefix; TypedProperties serverProperties = new TypedProperties(System.getProperties()); boolean allowSelfSignedCerts = serverProperties.is(ServerConstants.HTTPS_ALLOW_SELF_SIGNED_CERTS, true); - String allowServerNames = serverProperties.get(ServerConstants.HTTPS_ALLOW_SELF_SIGNED_CERTS, "all"); - TransportManagerFactory.initHttps(allowServerNames, allowSelfSignedCerts); + String allowServerNames = serverProperties.get(ServerConstants.HTTPS_VERIFIED_SERVERS, "all"); + boolean https2Enabled = serverProperties.is(ServerConstants.HTTPS2_ENABLE, true); + TransportManagerFactory.initHttps(allowServerNames, allowSelfSignedCerts, https2Enabled); } protected static void initFromServerProperties() { diff --git a/symmetric-core/build.gradle b/symmetric-core/build.gradle index 5b87fe8381..5a21ab0b3f 100644 --- a/symmetric-core/build.gradle +++ b/symmetric-core/build.gradle @@ -9,6 +9,9 @@ apply from: symAssembleDir + '/common.gradle' compile "com.google.code.gson:gson:$gsonVersion" compile "org.springframework:spring-core:$springVersion" + compileOnly "org.eclipse.jetty:jetty-alpn-conscrypt-server:$jettyVersion" + compile "com.squareup.okhttp3:okhttp:4.4.1" + compileOnly ("nl.cad:tps-parse:1.0.15-SNAPSHOT") { exclude group: 'commons-lang', module: 'commons-lang' } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ServerConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ServerConstants.java index 7f4637a927..0461ab00e7 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ServerConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ServerConstants.java @@ -33,6 +33,8 @@ public class ServerConstants { public final static String HTTPS_ENABLE = "https.enable"; public final static String HTTPS_PORT = "https.port"; + public final static String HTTPS2_ENABLE = "https2.enable"; + public final static String HTTPS_VERIFIED_SERVERS = "https.verified.server.names"; public final static String HTTPS_ALLOW_SELF_SIGNED_CERTS = "https.allow.self.signed.certs"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java index 39c8eb2bda..5089350552 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java @@ -23,7 +23,6 @@ import java.io.BufferedReader; import java.io.EOFException; import java.io.IOException; -import java.net.HttpURLConnection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -51,6 +50,7 @@ import org.jumpmind.symmetric.service.IService; import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport; import org.jumpmind.symmetric.transport.ITransportManager; +import org.jumpmind.symmetric.web.WebConstants; import org.jumpmind.util.AppUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -253,15 +253,16 @@ protected void sendAck(Node remote, Node local, NodeSecurity localSecurity, int statusCode = -1; int numberOfStatusSendRetries = parameterService.getInt(ParameterConstants.DATA_LOADER_NUM_OF_ACK_RETRIES); - for (int i = 0; i < numberOfStatusSendRetries && statusCode != HttpURLConnection.HTTP_OK; i++) { + for (int i = 0; i < numberOfStatusSendRetries && statusCode != WebConstants.SC_OK; i++) { try { statusCode = transportManager.sendAcknowledgement(remote, list, local, localSecurity.getNodePassword(), parameterService.getRegistrationUrl()); exception = null; } catch (Exception e) { + e.printStackTrace(); exception = e; } - if (statusCode != HttpURLConnection.HTTP_OK) { + if (statusCode != WebConstants.SC_OK) { String message = String.format("Ack was not sent successfully on try number %s of %s.", i+1, numberOfStatusSendRetries); if (statusCode > 0) { message += String.format(" statusCode=%s", statusCode); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/BandwidthService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/BandwidthService.java index 65086b0a17..8d330a3806 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/BandwidthService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/BandwidthService.java @@ -23,7 +23,6 @@ import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStream; -import java.net.HttpURLConnection; import java.net.SocketTimeoutException; import java.net.URL; import java.util.ArrayList; @@ -39,6 +38,7 @@ import org.jumpmind.symmetric.service.IBandwidthService; import org.jumpmind.symmetric.transport.BandwidthTestResults; import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport; +import org.jumpmind.symmetric.transport.http.Http2Connection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,31 +78,20 @@ public double getDownloadKbpsFor(String syncUrl, long sampleSize, long maxTestDu protected BandwidthTestResults getDownloadResultsFor(String syncUrl, long sampleSize, long maxTestDuration) throws IOException { byte[] buffer = new byte[1024]; - InputStream is = null; - try { - BandwidthTestResults bw = new BandwidthTestResults(); - URL u = new URL(String.format("%s/bandwidth?direction=pull&sampleSize=%s", syncUrl, sampleSize)); - bw.start(); - HttpURLConnection conn = (HttpURLConnection) u.openConnection(); - - conn.connect(); - is = conn.getInputStream(); - int r; - while (-1 != (r = is.read(buffer)) && bw.getElapsed() <= maxTestDuration) { - bw.transmitted(r); - } - is.close(); - bw.stop(); - log.info("{} was calculated to have a download bandwidth of {} kbps", syncUrl, bw.getKbps()); - return bw; - } finally { - if (is != null) { - try { - is.close(); - } catch (IOException e) { + BandwidthTestResults bw = new BandwidthTestResults(); + URL u = new URL(String.format("%s/bandwidth?direction=pull&sampleSize=%s", syncUrl, sampleSize)); + bw.start(); + try (Http2Connection conn = new Http2Connection(u)) { + try (InputStream is = conn.getInputStream()) { + int r; + while (-1 != (r = is.read(buffer)) && bw.getElapsed() <= maxTestDuration) { + bw.transmitted(r); } } } + bw.stop(); + log.info("{} was calculated to have a download bandwidth of {} kbps", syncUrl, bw.getKbps()); + return bw; } public double getUploadKbpsFor(Node remoteNode, Node localNode, long sampleSize, long maxTestDuration) throws IOException { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java index 7691cb10bb..1cb7c9ce2a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.io.OutputStream; import java.net.ConnectException; -import java.net.HttpURLConnection; import java.net.UnknownHostException; import java.sql.Types; import java.util.Collection; @@ -68,6 +67,7 @@ import org.jumpmind.symmetric.transport.ConnectionRejectedException; import org.jumpmind.symmetric.transport.ITransportManager; import org.jumpmind.symmetric.transport.ServiceUnavailableException; +import org.jumpmind.symmetric.web.WebConstants; import org.jumpmind.util.AppUtils; import org.jumpmind.util.RandomTimeSlot; @@ -738,7 +738,7 @@ public void requestNodeCopy() { try { log.info("Detected that node '{}' should be copied to a new node id. Attempting to contact server to accomplish this", copyFrom.getNodeId()); - copied = transportManager.sendCopyRequest(copyFrom) == HttpURLConnection.HTTP_OK; + copied = transportManager.sendCopyRequest(copyFrom) == WebConstants.SC_OK; if (copied) { nodeService.deleteIdentity(); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/TransportManagerFactory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/TransportManagerFactory.java index 137a9531fa..2071635f78 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/TransportManagerFactory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/TransportManagerFactory.java @@ -33,6 +33,7 @@ import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; import org.apache.commons.lang.ClassUtils; import org.apache.commons.lang.StringUtils; @@ -43,6 +44,8 @@ import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.ServerConstants; import org.jumpmind.symmetric.transport.file.FileTransportManager; +import org.jumpmind.symmetric.transport.http.ConscryptHelper; +import org.jumpmind.symmetric.transport.http.Http2Connection; import org.jumpmind.symmetric.transport.http.HttpTransportManager; import org.jumpmind.symmetric.transport.http.SelfSignedX509TrustManager; import org.jumpmind.symmetric.transport.internal.InternalTransportManager; @@ -56,10 +59,10 @@ public TransportManagerFactory(ISymmetricEngine symmetricEngine) { } public static void initHttps(final String httpSslVerifiedServerNames, - boolean allowSelfSignedCerts) { + boolean allowSelfSignedCerts, boolean enableHttps2) { try { if (!StringUtils.isBlank(httpSslVerifiedServerNames)) { - HttpsURLConnection.setDefaultHostnameVerifier(new HostnameVerifier() { + HostnameVerifier hostnameVerifier = new HostnameVerifier() { public boolean verify(String s, SSLSession sslsession) { boolean verified = false; if (!StringUtils.isBlank(httpSslVerifiedServerNames)) { @@ -78,11 +81,15 @@ public boolean verify(String s, SSLSession sslsession) { } return verified; } - }); + }; + HttpsURLConnection.setDefaultHostnameVerifier(hostnameVerifier); + if (enableHttps2) { + Http2Connection.setHostnameVerifier(hostnameVerifier); + } } if (allowSelfSignedCerts) { - HttpsURLConnection.setDefaultSSLSocketFactory(createSelfSignedSocketFactory()); + initSelfSignedSocketFactory(enableHttps2); } } catch (GeneralSecurityException ex) { @@ -103,7 +110,8 @@ public ITransportManager create(String transport) { // Allow self signed certs based on the parameter value. boolean allowSelfSignedCerts = symmetricEngine.getParameterService().is( ServerConstants.HTTPS_ALLOW_SELF_SIGNED_CERTS, false); - initHttps(httpSslVerifiedServerNames, allowSelfSignedCerts); + boolean https2Enabled = symmetricEngine.getParameterService().is(ServerConstants.HTTPS2_ENABLE, true); + initHttps(httpSslVerifiedServerNames, allowSelfSignedCerts, https2Enabled); return createHttpTransportManager(symmetricEngine); } else if (Constants.PROTOCOL_FILE.equalsIgnoreCase(transport)) { return new FileTransportManager(symmetricEngine); @@ -148,16 +156,20 @@ protected HttpTransportManager createHttpTransportManager(ISymmetricEngine symme * @throws KeyManagementException * @throws KeyStoreException */ - private static SSLSocketFactory createSelfSignedSocketFactory() + private static void initSelfSignedSocketFactory(boolean enableHttps2) throws NoSuchAlgorithmException, KeyManagementException, KeyStoreException { - SSLSocketFactory factory = null; + if (enableHttps2) { + new ConscryptHelper().checkProviderInstalled(); + } + X509TrustManager trustManager = new SelfSignedX509TrustManager(null); SSLContext context = SSLContext.getInstance("TLS"); - context.init(null, new TrustManager[] { new SelfSignedX509TrustManager(null) }, - new SecureRandom()); - factory = context.getSocketFactory(); - - return factory; + context.init(null, new TrustManager[] { trustManager }, new SecureRandom()); + SSLSocketFactory sslSocketFactory = context.getSocketFactory(); + + HttpsURLConnection.setDefaultSSLSocketFactory(sslSocketFactory); + Http2Connection.setSslSocketFactory(sslSocketFactory); + Http2Connection.setTrustManager(trustManager); } } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/file/FileTransportManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/file/FileTransportManager.java index 72b98d2841..f860fc0460 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/file/FileTransportManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/file/FileTransportManager.java @@ -23,7 +23,6 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; -import java.net.HttpURLConnection; import java.util.List; import java.util.Map; @@ -37,6 +36,7 @@ import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport; import org.jumpmind.symmetric.transport.ITransportManager; import org.jumpmind.symmetric.transport.http.HttpTransportManager; +import org.jumpmind.symmetric.web.WebConstants; import org.jumpmind.util.FormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +55,7 @@ public FileTransportManager(ISymmetricEngine engine) { @Override public int sendAcknowledgement(Node remote, List list, Node local, String securityToken, String registrationUrl) throws IOException { - return HttpURLConnection.HTTP_OK; + return WebConstants.SC_OK; } @Override diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/ConscryptHelper.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/ConscryptHelper.java new file mode 100644 index 0000000000..e5784049e0 --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/ConscryptHelper.java @@ -0,0 +1,44 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.transport.http; + +import java.security.Provider; +import java.security.Security; + +import org.conscrypt.Conscrypt; + +public class ConscryptHelper { + + protected final static String PROVIDER_NAME = "Conscrypt"; + + public void checkProviderInstalled() { + if (Security.getProvider(PROVIDER_NAME) == null) { + Security.insertProviderAt(Conscrypt.newProvider(), 1); + } else { + Provider[] providers = Security.getProviders(); + if (providers.length > 0 && !providers[0].getName().equals(PROVIDER_NAME)) { + Security.removeProvider(PROVIDER_NAME); + Security.insertProviderAt(Conscrypt.newProvider(), 1); + } + } + } + +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/Http2Connection.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/Http2Connection.java new file mode 100644 index 0000000000..71d0320c9a --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/Http2Connection.java @@ -0,0 +1,448 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.transport.http; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ProtocolException; +import java.net.URL; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.X509TrustManager; + +import org.jumpmind.symmetric.web.WebConstants; +import org.jumpmind.util.CustomizableThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kotlin.Pair; +import okhttp3.CacheControl; +import okhttp3.Headers; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okio.BufferedSink; + +public class Http2Connection extends HttpConnection { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + protected static X509TrustManager trustManager; + + protected static SSLSocketFactory sslSocketFactory; + + protected static HostnameVerifier hostnameVerifier; + + protected ExecutorService executor; + + protected OkHttpClient.Builder clientBuilder; + + protected Request.Builder requestBuilder; + + protected Response response; + + protected String requestMethod; + + protected MediaType mediaType; + + protected boolean dooutput; + + protected OutputStream internalOut; + + protected OutputStream externalOut; + + protected IOException exception; + + public Http2Connection(URL url) throws IOException { + super(url); + reset(); + } + + protected void reset() { + executor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(Thread.currentThread().getName())); + clientBuilder = new OkHttpClient.Builder().sslSocketFactory(sslSocketFactory, trustManager). + hostnameVerifier(hostnameVerifier).retryOnConnectionFailure(true); + requestBuilder = new Request.Builder().url(url); + dooutput = false; + requestMethod = "GET"; + internalOut = null; + externalOut = null; + exception = null; + } + + @Override + public synchronized void disconnect() { + close(); + } + + @Override + public synchronized void close() { + closeOutput(); + if (response != null) { + response.close(); + response = null; + } + if (!executor.isTerminated()) { + executor.shutdownNow(); + } + reset(); + } + + protected void closeOutput() { + if (externalOut != null) { + try { + externalOut.close(); + } catch (IOException e) { + e.printStackTrace(); + } + externalOut = null; + internalOut = null; + } + } + + protected synchronized Response getResponse() throws IOException { + if (response == null) { + RequestBody requestBody = null; + if (dooutput) { + requestBody = new BlockingRequestBody(this); + } + + Request request = requestBuilder.method(requestMethod, requestBody).build(); + OkHttpClient client = clientBuilder.build(); + + if (log.isDebugEnabled()) { + logHeaders(request); + } + + try { + response = client.newCall(request).execute(); + } catch (IOException e) { + exception = e; + throw e; + } + if (log.isDebugEnabled()) { + log.debug("HTTP response: {}", response.code()); + } + } + return response; + } + + protected void logHeaders(Request request) { + Headers headers = request.headers(); + StringBuilder sb = new StringBuilder("{"); + Iterator> iter = headers.iterator(); + while (iter.hasNext()) { + Pair header = iter.next(); + if (!header.getFirst().equalsIgnoreCase(WebConstants.HEADER_SESSION_ID) && + !header.getFirst().equalsIgnoreCase(WebConstants.HEADER_SECURITY_TOKEN)) { + if (sb.length() > 1) { + sb.append(", "); + } + sb.append(header.getFirst()).append("=").append(header.getSecond()); + } + } + sb.append("}"); + log.debug("Request headers: {}", sb.toString()); + } + + @Override + public int getResponseCode() throws IOException { + waitForResponse(); + return getResponse().code(); + } + + @Override + public InputStream getInputStream() throws IOException { + closeOutput(); + waitForResponse(); + return getResponse().body().byteStream(); + } + + public String getResponseBody() throws IOException { + waitForResponse(); + return response.body().string(); + } + + protected void waitForResponse() throws IOException { + if (dooutput) { + if (!executor.isTerminated()) { + try { + log.debug("Waiting for HTTP thread"); + long hours = 1; + while (!executor.awaitTermination(1, TimeUnit.HOURS)) { + log.info("Waiting for HTTP thread for {} hours", hours); + hours++; + } + log.debug("Done waiting for HTTP thread"); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + if (exception != null) { + throw exception; + } + } + } + + @Override + public synchronized OutputStream getOutputStream() throws IOException { + if (externalOut == null) { + externalOut = new BlockingOutputStream(this); + Callable callable = new CallableResponse(this); + Future future = executor.submit(callable); + executor.shutdown(); + try { + log.debug("Waiting for output stream"); + // wait until internal thread on BlockingRequestBody.writeTo() has the internal output stream + wait(); + } catch (InterruptedException e) { + executor.shutdownNow(); + throw new IOException(e); + } + + if (future != null && internalOut == null) { + try { + future.get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + if (e.getCause() != null) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + throw new IOException(e.getCause()); + } + throw new IOException(e); + } + } + } + return externalOut; + } + + @Override + public synchronized String getContentEncoding() { + return response == null ? null : response.header("Content-Encoding"); + } + + @Override + public synchronized String getHeaderField(String name) { + return response == null ? null : response.header(name); + } + + @Override + public synchronized Map> getHeaderFields() { + return response == null ? new HashMap>() : response.headers().toMultimap(); + } + + @Override + public void setConnectTimeout(int timeout) { + clientBuilder = clientBuilder.connectTimeout(timeout, TimeUnit.MILLISECONDS); + } + + @Override + public void setReadTimeout(int timeout) { + clientBuilder = clientBuilder.readTimeout(timeout, TimeUnit.MILLISECONDS); + } + + @Override + public void setInstanceFollowRedirects(boolean followRedirects) { + clientBuilder = clientBuilder.followRedirects(true).followSslRedirects(true); + } + + @Override + public void setRequestMethod(String method) throws ProtocolException { + requestMethod = method; + } + + @Override + public void setDoInput(boolean doinput) { + } + + @Override + public void setDoOutput(boolean dooutput) { + this.dooutput = dooutput; + } + + @Override + public void setAllowUserInteraction(boolean allowuserinteraction) { + } + + @Override + public void setChunkedStreamingMode(int chunklen) { + } + + @Override + public void setUseCaches(boolean usecaches) { + requestBuilder = requestBuilder.cacheControl(new CacheControl.Builder().noCache().build()); + } + + public void setRequestProperty(String key, String value) { + detectMediaType(key, value); + requestBuilder = requestBuilder.header(key, value); + } + + @Override + public void addRequestProperty(String key, String value) { + detectMediaType(key, value); + requestBuilder = requestBuilder.addHeader(key, value); + } + + protected void detectMediaType(String key, String value) { + if (key.equalsIgnoreCase("Content-Type")) { + if (value.equals("gzip")) { + value = "application/gzip"; + } + mediaType = MediaType.parse(value); + } + } + + public static X509TrustManager getTrustManager() { + return trustManager; + } + + public static void setTrustManager(X509TrustManager trustManager) { + Http2Connection.trustManager = trustManager; + } + + public static SSLSocketFactory getSslSocketFactory() { + return sslSocketFactory; + } + + public static void setSslSocketFactory(SSLSocketFactory sslSocketFactory) { + Http2Connection.sslSocketFactory = sslSocketFactory; + } + + public static HostnameVerifier getHostnameVerifier() { + return hostnameVerifier; + } + + public static void setHostnameVerifier(HostnameVerifier hostnameVerifier) { + Http2Connection.hostnameVerifier = hostnameVerifier; + } + + protected class CallableResponse implements Callable { + + protected Http2Connection connection; + + protected CallableResponse(Http2Connection connection) { + this.connection = connection; + } + + @Override + public Response call() throws Exception { + try { + exception = null; + return getResponse(); + } catch (Throwable t) { + log.debug("No output stream, caught exception instead", t); + synchronized (connection) { + internalOut = null; + // awake calling thread on getOutputStream() so it can throw this exception + connection.notifyAll(); + throw t; + } + } + } + } + + protected class BlockingRequestBody extends RequestBody { + + protected Http2Connection connection; + + protected BlockingRequestBody(Http2Connection connection) { + this.connection = connection; + } + + @Override + public MediaType contentType() { + if (mediaType != null) { + return mediaType; + } + return requestMethod.equalsIgnoreCase("POST") ? MediaType.parse("application/x-www-form-urlencoded"): + MediaType.parse("text/plain"); + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + internalOut = sink.outputStream(); + synchronized (connection) { + log.debug("Ready with output stream"); + // awake calling thread on getOutputStream() to return external output stream + connection.notifyAll(); + try { + log.debug("Waiting for output stream to close"); + // wait for calling thread to close the external output stream + connection.wait(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } + } + + protected class BlockingOutputStream extends OutputStream { + + protected Http2Connection connection; + + protected boolean closed; + + protected BlockingOutputStream(Http2Connection connection) { + this.connection = connection; + } + + @Override + public void write(int b) throws IOException { + internalOut.write(b); + } + + @Override + public void flush() throws IOException { + internalOut.flush(); + } + + @Override + public void close() throws IOException { + if (!closed) { + synchronized (connection) { + log.debug("Closing output stream"); + // awake internal thread on BlockingRequestBody.writeTo() so HTTP request can finish + connection.notifyAll(); + } + closed = true; + } + } + } + +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpConnection.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpConnection.java new file mode 100644 index 0000000000..324bc70f37 --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpConnection.java @@ -0,0 +1,128 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.transport.http; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.ProtocolException; +import java.net.URL; +import java.util.List; +import java.util.Map; + +public class HttpConnection implements Closeable { + + public static final int HTTP_OK = HttpURLConnection.HTTP_OK; + + public static final int HTTP_NOT_MODIFIED = HttpURLConnection.HTTP_NOT_MODIFIED; + + protected URL url; + + protected HttpURLConnection conn; + + public HttpConnection(URL url) throws IOException { + this.url = url; + conn = (HttpURLConnection) url.openConnection(); + } + + public void disconnect() { + conn.disconnect(); + } + + @Override + public void close() { + } + + public URL getURL() { + return url; + } + + public String getContentEncoding() { + return conn.getContentEncoding(); + } + + public InputStream getInputStream() throws IOException { + return conn.getInputStream(); + } + + public OutputStream getOutputStream() throws IOException { + return conn.getOutputStream(); + } + + public void setConnectTimeout(int timeout) { + conn.setConnectTimeout(timeout); + } + + public void setReadTimeout(int timeout) { + conn.setReadTimeout(timeout); + } + + public void setDoInput(boolean doinput) { + conn.setDoInput(doinput); + } + + public void setDoOutput(boolean dooutput) { + conn.setDoOutput(dooutput); + } + + public void setAllowUserInteraction(boolean allowuserinteraction) { + conn.setAllowUserInteraction(allowuserinteraction); + } + + public void setUseCaches(boolean usecaches) { + conn.setUseCaches(usecaches); + } + + public void setRequestProperty(String key, String value) { + conn.setRequestProperty(key, value); + } + + public void addRequestProperty(String key, String value) { + conn.addRequestProperty(key, value); + } + + public void setChunkedStreamingMode(int chunklen) { + conn.setChunkedStreamingMode(chunklen); + } + + public String getHeaderField(String name) { + return conn.getHeaderField(name); + } + + public Map> getHeaderFields() { + return conn.getHeaderFields(); + } + + public void setInstanceFollowRedirects(boolean followRedirects) { + conn.setInstanceFollowRedirects(followRedirects); + } + + public void setRequestMethod(String method) throws ProtocolException { + conn.setRequestMethod(method); + } + + public int getResponseCode() throws IOException { + return conn.getResponseCode(); + } + +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpIncomingTransport.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpIncomingTransport.java index aa25527be2..49191a556a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpIncomingTransport.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpIncomingTransport.java @@ -24,7 +24,6 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; -import java.net.HttpURLConnection; import java.net.URL; import java.util.LinkedHashMap; import java.util.Map; @@ -48,7 +47,7 @@ public class HttpIncomingTransport implements IIncomingTransport { private HttpTransportManager httpTransportManager; - private HttpURLConnection connection; + private HttpConnection connection; private BufferedReader reader; @@ -64,14 +63,14 @@ public class HttpIncomingTransport implements IIncomingTransport { private String securityToken; - public HttpIncomingTransport(HttpTransportManager httpTransportManager, HttpURLConnection connection, IParameterService parameterService) { + public HttpIncomingTransport(HttpTransportManager httpTransportManager, HttpConnection connection, IParameterService parameterService) { this.httpTransportManager = httpTransportManager; this.connection = connection; this.parameterService = parameterService; this.httpTimeout = parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_TIMEOUT); } - public HttpIncomingTransport(HttpTransportManager httpTransportManager, HttpURLConnection connection, IParameterService parameterService, + public HttpIncomingTransport(HttpTransportManager httpTransportManager, HttpConnection connection, IParameterService parameterService, String nodeId, String securityToken) { this(httpTransportManager, connection, parameterService); this.nodeId = nodeId; @@ -179,7 +178,7 @@ public Map getHeaders() { * @return * @throws IOException */ - private HttpURLConnection openConnectionCheckRedirects(HttpURLConnection connection) throws IOException + private HttpConnection openConnectionCheckRedirects(HttpConnection connection) throws IOException { boolean redir; int redirects = 0; @@ -187,7 +186,7 @@ private HttpURLConnection openConnectionCheckRedirects(HttpURLConnection connect connection.setInstanceFollowRedirects(false); redir = false; int stat = connection.getResponseCode(); - if (stat >= 300 && stat <= 307 && stat != 306 && stat != HttpURLConnection.HTTP_NOT_MODIFIED) { + if (stat >= 300 && stat <= 307 && stat != 306 && stat != HttpConnection.HTTP_NOT_MODIFIED) { URL base = connection.getURL(); redirectionUrl = connection.getHeaderField("Location"); @@ -215,7 +214,7 @@ private HttpURLConnection openConnectionCheckRedirects(HttpURLConnection connect return connection; } - public HttpURLConnection getConnection() { + public HttpConnection getConnection() { return connection; } } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java index 6838505a0a..acf837556f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; -import java.net.HttpURLConnection; import java.net.URL; import java.net.URLConnection; import java.nio.charset.Charset; @@ -65,7 +64,7 @@ public class HttpOutgoingTransport implements IOutgoingWithResponseTransport { private BufferedReader reader; - private HttpURLConnection connection; + private HttpConnection connection; private int httpTimeout; @@ -197,7 +196,7 @@ private void closeWriter(boolean closeQuietly) { * @throws {@link ConnectionRejectedException} * @throws {@link AuthenticationException} */ - private HttpURLConnection requestReservation(String queue) { + private HttpConnection requestReservation(String queue) { try { connection = httpTransportManager.openConnection(url, nodeId, securityToken); connection.setUseCaches(false); @@ -239,6 +238,7 @@ public OutputStream openStream() { connection.addRequestProperty("Content-Type", "gzip"); // application/x-gzip? } } else { + connection.setRequestMethod("POST"); boundary = Long.toHexString(System.currentTimeMillis()); connection.setRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundary); } @@ -328,28 +328,27 @@ public boolean isOpen() { } public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurationService, String queue, Node targetNode) { - - HttpURLConnection connection = requestReservation(queue); - - // Connection contains remote suspend/ignore channels list if - // reservation was successful. - ChannelMap suspendIgnoreChannelsList = new ChannelMap(); - String suspends = connection.getHeaderField(WebConstants.SUSPENDED_CHANNELS); - String ignores = connection.getHeaderField(WebConstants.IGNORED_CHANNELS); - - suspendIgnoreChannelsList.addSuspendChannels(suspends); - suspendIgnoreChannelsList.addIgnoreChannels(ignores); - - ChannelMap localSuspendIgnoreChannelsList = configurationService.getSuspendIgnoreChannelLists(targetNode.getNodeId()); - suspendIgnoreChannelsList.addSuspendChannels(localSuspendIgnoreChannelsList.getSuspendChannels()); - suspendIgnoreChannelsList.addIgnoreChannels(localSuspendIgnoreChannelsList.getIgnoreChannels()); + try (HttpConnection connection = requestReservation(queue)) { + // Connection contains remote suspend/ignore channels list if + // reservation was successful. + + String suspends = connection.getHeaderField(WebConstants.SUSPENDED_CHANNELS); + String ignores = connection.getHeaderField(WebConstants.IGNORED_CHANNELS); + + suspendIgnoreChannelsList.addSuspendChannels(suspends); + suspendIgnoreChannelsList.addIgnoreChannels(ignores); + + ChannelMap localSuspendIgnoreChannelsList = configurationService.getSuspendIgnoreChannelLists(targetNode.getNodeId()); + suspendIgnoreChannelsList.addSuspendChannels(localSuspendIgnoreChannelsList.getSuspendChannels()); + suspendIgnoreChannelsList.addIgnoreChannels(localSuspendIgnoreChannelsList.getIgnoreChannels()); + } return suspendIgnoreChannelsList; } - public HttpURLConnection getConnection() { + public HttpConnection getConnection() { return connection; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java index 6dc0247272..a391cc1414 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java @@ -27,7 +27,6 @@ import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; -import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; import java.util.HashMap; @@ -42,6 +41,7 @@ import org.jumpmind.symmetric.Version; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.common.ServerConstants; import org.jumpmind.symmetric.io.IoConstants; import org.jumpmind.symmetric.model.BatchId; import org.jumpmind.symmetric.model.IncomingBatch; @@ -71,6 +71,8 @@ public class HttpTransportManager extends AbstractTransportManager implements IT protected boolean useHeaderSecurityToken; protected boolean useSessionAuth; + + protected boolean isHttp2Enabled; public HttpTransportManager() { } @@ -80,6 +82,7 @@ public HttpTransportManager(ISymmetricEngine engine) { this.engine = engine; useHeaderSecurityToken = engine.getParameterService().is(ParameterConstants.TRANSPORT_HTTP_USE_HEADER_SECURITY_TOKEN); useSessionAuth = engine.getParameterService().is(ParameterConstants.TRANSPORT_HTTP_USE_SESSION_AUTH); + isHttp2Enabled = engine.getParameterService().is(ServerConstants.HTTPS2_ENABLE); } public int sendCopyRequest(Node local) throws IOException { @@ -123,7 +126,7 @@ public int sendAcknowledgement(Node remote, List list, Node local log.debug("Sending ack: {}", data); return sendMessage("ack", remote, local, data, securityToken, registrationUrl); } - return HttpURLConnection.HTTP_OK; + return HttpConnection.HTTP_OK; } public void writeAcknowledgement(OutputStream out, Node remote, List list, Node local, @@ -139,32 +142,38 @@ protected int sendMessage(String action, Node remote, Node local, String data, } protected int sendMessage(URL url, String nodeId, String securityToken, String data) throws IOException { - HttpURLConnection conn = openConnection(url, nodeId, securityToken); - conn.setRequestMethod("POST"); - conn.setAllowUserInteraction(false); - conn.setDoOutput(true); - conn.setConnectTimeout(getHttpTimeOutInMs()); - conn.setReadTimeout(getHttpTimeOutInMs()); - try(OutputStream os = conn.getOutputStream()) { - writeMessage(os, data); - checkForConnectionUpgrade(conn); - - try (InputStream is = conn.getInputStream()) { - byte[] bytes = new byte[32]; - while (is.read(bytes) != -1) { - log.debug("Read keep-alive"); + try (HttpConnection conn = openConnection(url, nodeId, securityToken)) { + conn.setRequestMethod("POST"); + conn.setAllowUserInteraction(false); + conn.setDoOutput(true); + conn.setConnectTimeout(getHttpTimeOutInMs()); + conn.setReadTimeout(getHttpTimeOutInMs()); + try (OutputStream os = conn.getOutputStream()) { + writeMessage(os, data); + checkForConnectionUpgrade(conn); + + try (InputStream is = conn.getInputStream()) { + byte[] bytes = new byte[32]; + while (is.read(bytes) != -1) { + log.debug("Read keep-alive"); + } } + return conn.getResponseCode(); } - return conn.getResponseCode(); } } - protected void checkForConnectionUpgrade(HttpURLConnection conn) { + protected void checkForConnectionUpgrade(HttpConnection conn) { } - public HttpURLConnection openConnection(URL url, String nodeId, String securityToken) + public HttpConnection openConnection(URL url, String nodeId, String securityToken) throws IOException { - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + HttpConnection conn = null; + if (isHttp2Enabled) { + conn = new Http2Connection(url); + } else { + conn = new HttpConnection(url); + } conn.setRequestProperty(WebConstants.HEADER_ACCEPT_CHARSET, IoConstants.ENCODING); boolean hasSession = false; @@ -177,12 +186,12 @@ public HttpURLConnection openConnection(URL url, String nodeId, String securityT } if (securityToken != null && useHeaderSecurityToken && !hasSession) { - conn.addRequestProperty(WebConstants.HEADER_SECURITY_TOKEN, securityToken); + conn.setRequestProperty(WebConstants.HEADER_SECURITY_TOKEN, securityToken); } return conn; } - public void updateSession(HttpURLConnection conn) { + public void updateSession(HttpConnection conn) { if (useSessionAuth) { String sessionId = conn.getHeaderField(WebConstants.HEADER_SET_SESSION_ID); if (sessionId != null) { @@ -191,13 +200,13 @@ public void updateSession(HttpURLConnection conn) { } } - public void clearSession(HttpURLConnection conn) { + public void clearSession(HttpConnection conn) { if (useSessionAuth) { sessionIdByUri.remove(getUri(conn)); } } - protected String getUri(HttpURLConnection conn) { + protected String getUri(HttpConnection conn) { String uri = conn.getURL().toString(); uri = uri.substring(0, uri.lastIndexOf("/")); return uri; @@ -237,7 +246,7 @@ public void writeMessage(OutputStream out, String data) throws IOException { public IIncomingTransport getFilePullTransport(Node remote, Node local, String securityToken, Map requestProperties, String registrationUrl) throws IOException { - HttpURLConnection conn = createGetConnectionFor(new URL(buildURL("filesync/pull", remote, local, securityToken, registrationUrl)), + HttpConnection conn = createGetConnectionFor(new URL(buildURL("filesync/pull", remote, local, securityToken, registrationUrl)), local.getNodeId(), securityToken); if (requestProperties != null) { for (String key : requestProperties.keySet()) { @@ -249,7 +258,7 @@ public IIncomingTransport getFilePullTransport(Node remote, Node local, String s public IIncomingTransport getPullTransport(Node remote, Node local, String securityToken, Map requestProperties, String registrationUrl) throws IOException { - HttpURLConnection conn = createGetConnectionFor(new URL(buildURL("pull", remote, local, securityToken, registrationUrl)), + HttpConnection conn = createGetConnectionFor(new URL(buildURL("pull", remote, local, securityToken, registrationUrl)), local.getNodeId(), securityToken); if (requestProperties != null) { for (String key : requestProperties.keySet()) { @@ -260,7 +269,7 @@ public IIncomingTransport getPullTransport(Node remote, Node local, String secur } public IIncomingTransport getPingTransport(Node remote, Node local, String registrationUrl) throws IOException { - HttpURLConnection conn = createGetConnectionFor(new URL(resolveURL(remote.getSyncUrl(), registrationUrl) + "/ping")); + HttpConnection conn = createGetConnectionFor(new URL(resolveURL(remote.getSyncUrl(), registrationUrl) + "/ping")); return new HttpIncomingTransport(this, conn, engine.getParameterService()); } @@ -294,7 +303,7 @@ public IIncomingTransport getConfigTransport(Node remote, Node local, String sec StringBuilder builder = new StringBuilder(buildURL("config", remote, local, securityToken, registrationUrl)); append(builder, WebConstants.SYMMETRIC_VERSION, symmetricVersion); append(builder, WebConstants.CONFIG_VERSION, configVersion); - HttpURLConnection conn = createGetConnectionFor(new URL(builder.toString()), local.getNodeId(), securityToken); + HttpConnection conn = createGetConnectionFor(new URL(builder.toString()), local.getNodeId(), securityToken); return new HttpIncomingTransport(this, conn, engine.getParameterService()); } @@ -332,8 +341,8 @@ public static String buildRegistrationUrl(String baseUrl, Node node) throws IOEx return builder.toString(); } - protected HttpURLConnection createGetConnectionFor(URL url, String nodeId, String securityToken) throws IOException { - HttpURLConnection conn = openConnection(url, nodeId, securityToken); + protected HttpConnection createGetConnectionFor(URL url, String nodeId, String securityToken) throws IOException { + HttpConnection conn = openConnection(url, nodeId, securityToken); conn.setRequestProperty("accept-encoding", "gzip"); conn.setConnectTimeout(getHttpTimeOutInMs()); conn.setReadTimeout(getHttpTimeOutInMs()); @@ -341,11 +350,11 @@ protected HttpURLConnection createGetConnectionFor(URL url, String nodeId, Strin return conn; } - protected HttpURLConnection createGetConnectionFor(URL url) throws IOException { + protected HttpConnection createGetConnectionFor(URL url) throws IOException { return createGetConnectionFor(url, null, null); } - protected static InputStream getInputStreamFrom(HttpURLConnection connection) throws IOException { + protected static InputStream getInputStreamFrom(HttpConnection connection) throws IOException { String type = connection.getContentEncoding(); InputStream in = connection.getInputStream(); if (!StringUtils.isBlank(type) && type.equals("gzip")) { @@ -357,7 +366,7 @@ protected static InputStream getInputStreamFrom(HttpURLConnection connection) th /** * If the content is gzip'd, then uncompress. */ - protected static BufferedReader getReaderFrom(HttpURLConnection connection) throws IOException { + protected static BufferedReader getReaderFrom(HttpConnection connection) throws IOException { String type = connection.getContentEncoding(); InputStream in = connection.getInputStream(); if (!StringUtils.isBlank(type) && type.equals("gzip")) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java index b49439af66..dc83d9695c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java @@ -27,7 +27,6 @@ import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.io.PrintWriter; -import java.net.HttpURLConnection; import java.util.List; import java.util.Map; @@ -47,7 +46,7 @@ import org.jumpmind.symmetric.transport.IOutgoingTransport; import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport; import org.jumpmind.symmetric.transport.ITransportManager; -import org.jumpmind.symmetric.transport.http.HttpIncomingTransport; +import org.jumpmind.symmetric.web.WebConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -205,7 +204,7 @@ public int sendAcknowledgement(Node remote, List list, Node local remoteEngine.getAcknowledgeService().ack(batchInfo); } } - return HttpURLConnection.HTTP_OK; + return WebConstants.SC_OK; } catch (Exception ex) { log.error("", ex); return -1; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/util/SymmetricUtils.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/util/SymmetricUtils.java index 3f8739ae54..fa8db13a5c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/util/SymmetricUtils.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/util/SymmetricUtils.java @@ -116,6 +116,9 @@ public static final void replaceSystemAndEnvironmentVariables(Properties propert if (value.contains("portNumber")) { value = FormatUtils.replace("portNumber", AppUtils.getPortNumber(), value); } + if (value.contains("protocol")) { + value = FormatUtils.replace("protocol", AppUtils.getProtocol(), value); + } if (value.contains("ipAddress")) { value = FormatUtils.replace("ipAddress", AppUtils.getIpAddress(), value); } diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/transport/MockTransportManager.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/transport/MockTransportManager.java index b50cc396b5..8328337c0e 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/transport/MockTransportManager.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/transport/MockTransportManager.java @@ -23,14 +23,13 @@ import java.io.IOException; import java.io.OutputStream; -import java.net.HttpURLConnection; import java.util.List; import java.util.Map; import org.jumpmind.symmetric.model.BatchAck; import org.jumpmind.symmetric.model.IncomingBatch; import org.jumpmind.symmetric.model.Node; -import org.jumpmind.symmetric.transport.http.HttpIncomingTransport; +import org.jumpmind.symmetric.web.WebConstants; public class MockTransportManager implements ITransportManager { @@ -67,7 +66,7 @@ public IOutgoingWithResponseTransport getPushTransport(Node remote, public int sendAcknowledgement(Node remote, List list, Node local, String securityToken, String registrationUrl) throws IOException { - return HttpURLConnection.HTTP_OK; + return WebConstants.SC_OK; } @Override diff --git a/symmetric-server/build.gradle b/symmetric-server/build.gradle index 1d7ad20dc3..ed85a67d9d 100644 --- a/symmetric-server/build.gradle +++ b/symmetric-server/build.gradle @@ -42,6 +42,11 @@ apply from: symAssembleDir + '/asciidoc.gradle' exclude group: "org.ow2.asm" } + provided "org.eclipse.jetty.http2:http2-server:$jettyVersion" + provided "org.eclipse.jetty:jetty-alpn-conscrypt-server:$jettyVersion" + provided "org.mortbay.jetty.alpn:alpn-boot:$alpnBootVersion" + compileOnly "org.eclipse.jetty.alpn:alpn-api:$alpnApiVersion" + testCompile project(path: ':symmetric-util', configuration: 'testArtifacts') testCompile project(path: ':symmetric-io', configuration: 'testArtifacts') integrationTestCompile project(':symmetric-jdbc').sourceSets.integrationTest.output diff --git a/symmetric-server/src/main/deploy/conf/symmetric-server.properties b/symmetric-server/src/main/deploy/conf/symmetric-server.properties index 17a183b45f..d5abcf99f9 100644 --- a/symmetric-server/src/main/deploy/conf/symmetric-server.properties +++ b/symmetric-server/src/main/deploy/conf/symmetric-server.properties @@ -35,6 +35,10 @@ http.port=31415 # https.enable=false +# Enable HTTPS/2 for multiplexing and resistance to protocol attacks. +# +https2.enable=false + # Port number for synchronization over HTTPS (HTTP over SSL). # https.port=31417 diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/SymmetricWebServer.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/SymmetricWebServer.java index f526b0fd56..6b8ea0e92c 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/SymmetricWebServer.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/SymmetricWebServer.java @@ -31,7 +31,10 @@ import javax.websocket.server.ServerEndpointConfig; import org.apache.commons.lang.ClassUtils; +import org.eclipse.jetty.alpn.server.ALPNServerConnectionFactory; import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http2.HTTP2Cipher; +import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; @@ -103,6 +106,8 @@ public enum Mode { protected boolean httpsEnabled = false; + protected boolean https2Enabled = false; + protected int httpsPort = -1; protected String basicAuthUsername = null; @@ -170,6 +175,8 @@ protected void initFromProperties() { Boolean.parseBoolean(System.getProperty(ServerConstants.HTTP_ENABLE, "true"))); httpsEnabled = serverProperties.is(ServerConstants.HTTPS_ENABLE, Boolean.parseBoolean(System.getProperty(ServerConstants.HTTPS_ENABLE, "true"))); + https2Enabled = serverProperties.is(ServerConstants.HTTPS2_ENABLE, + Boolean.parseBoolean(System.getProperty(ServerConstants.HTTPS2_ENABLE, "true"))); httpPort = serverProperties.getInt(ServerConstants.HTTP_PORT, Integer.parseInt(System.getProperty(ServerConstants.HTTP_PORT, "" + httpPort))); httpsPort = serverProperties.getInt(ServerConstants.HTTPS_PORT, @@ -218,7 +225,7 @@ public SymmetricWebServer start(int httpPort, int securePort, Mode mode) throws SymmetricUtils.logNotices(); - TransportManagerFactory.initHttps(httpSslVerifiedServerNames, allowSelfSignedCerts); + TransportManagerFactory.initHttps(httpSslVerifiedServerNames, allowSelfSignedCerts, https2Enabled); // indicate to the app that we are in stand alone mode System.setProperty(SystemConstants.SYSPROP_STANDALONE_WEB, "true"); @@ -266,7 +273,7 @@ public SymmetricWebServer start(int httpPort, int securePort, Mode mode) throws Class remoteStatusEndpoint = loadRemoteStatusEndpoint(); if (remoteStatusEndpoint != null) { - ServerContainer container = WebSocketServerContainerInitializer.configureContext(webapp); + ServerContainer container = WebSocketServerContainerInitializer.initialize(webapp); container.setDefaultMaxBinaryMessageBufferSize(Integer.MAX_VALUE); container.setDefaultMaxTextMessageBufferSize(Integer.MAX_VALUE); ServerEndpointConfig websocketConfig = ServerEndpointConfig.Builder.create(remoteStatusEndpoint, "/control").build(); @@ -348,6 +355,7 @@ protected Connector[] getConnectors(Server server, int port, int securePort, Mod httpConfig.setSecurePort(securePort); } + httpConfig.setSendServerVersion(false); httpConfig.setOutputBufferSize(32768); if (mode.equals(Mode.HTTP) || mode.equals(Mode.MIXED)) { @@ -356,7 +364,7 @@ protected Connector[] getConnectors(Server server, int port, int securePort, Mod http.setHost(host); http.setIdleTimeout(maxIdleTime); connectors.add(http); - log.info(String.format("About to start %s web server on host:port %s:%s", name, host == null ? "default" : host, port)); + log.info(String.format("About to start %s web server on %s:%s:%s", name, host == null ? "default" : host, port, "HTTP/1.1")); } if (mode.equals(Mode.HTTPS) || mode.equals(Mode.MIXED)) { ISecurityService securityService = SecurityServiceFactory.create(SecurityServiceType.SERVER, @@ -364,41 +372,54 @@ protected Connector[] getConnectors(Server server, int port, int securePort, Mod securityService.installDefaultSslCert(host); String keyStorePassword = System.getProperty(SecurityConstants.SYSPROP_KEYSTORE_PASSWORD); keyStorePassword = (keyStorePassword != null) ? keyStorePassword : SecurityConstants.KEYSTORE_PASSWORD; - SslContextFactory sslConnectorFactory = new SslContextFactory(); - sslConnectorFactory.setKeyManagerPassword(keyStorePassword); - /* Prevent POODLE attack */ + + SslContextFactory.Server sslContextFactory = new SslContextFactory.Server(); + sslContextFactory.setKeyStore(securityService.getKeyStore()); + sslContextFactory.setTrustStore(securityService.getTrustStore()); + sslContextFactory.setKeyManagerPassword(keyStorePassword); + sslContextFactory.setCertAlias(System.getProperty(SecurityConstants.SYSPROP_KEYSTORE_CERT_ALIAS, + SecurityConstants.ALIAS_SYM_PRIVATE_KEY)); + String ignoredProtocols = System.getProperty(SecurityConstants.SYSPROP_SSL_IGNORE_PROTOCOLS); if (ignoredProtocols != null && ignoredProtocols.length() > 0) { String[] protocols = ignoredProtocols.split(","); - sslConnectorFactory.addExcludeProtocols(protocols); - } - else { - sslConnectorFactory.addExcludeProtocols("SSLv3"); + sslContextFactory.addExcludeProtocols(protocols); + } else { + sslContextFactory.addExcludeProtocols("SSLv3"); } String ignoredCiphers = System.getProperty(SecurityConstants.SYSPROP_SSL_IGNORE_CIPHERS); if (ignoredCiphers != null && ignoredCiphers.length() > 0) { String[] ciphers = ignoredCiphers.split(","); - sslConnectorFactory.addExcludeCipherSuites(ciphers); + sslContextFactory.addExcludeCipherSuites(ciphers); } - - sslConnectorFactory.setCertAlias(System.getProperty(SecurityConstants.SYSPROP_KEYSTORE_CERT_ALIAS, - SecurityConstants.ALIAS_SYM_PRIVATE_KEY)); - sslConnectorFactory.setKeyStore(securityService.getKeyStore()); - sslConnectorFactory.setTrustStore(securityService.getTrustStore()); - HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig); httpsConfig.addCustomizer(new SecureRequestCustomizer()); + String protocolName = null; + ServerConnector https = null; + + if (https2Enabled) { + sslContextFactory.setCipherComparator(HTTP2Cipher.COMPARATOR); + sslContextFactory.setProvider("Conscrypt"); + HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(httpsConfig); + ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory(); + alpn.setDefaultProtocol("h2"); + SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, alpn.getProtocol()); + https = new ServerConnector(server, ssl, alpn, h2, new HttpConnectionFactory(httpsConfig)); + protocolName = "HTTPS/2"; + } else { + SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.asString()); + https = new ServerConnector(server, ssl, new HttpConnectionFactory(httpsConfig)); + protocolName = "HTTPS/1.1"; + } - ServerConnector https = new ServerConnector(server, - new SslConnectionFactory(sslConnectorFactory, HttpVersion.HTTP_1_1.asString()), new HttpConnectionFactory(httpsConfig)); https.setPort(securePort); https.setIdleTimeout(maxIdleTime); https.setHost(host); connectors.add(https); - log.info(String.format("About to start %s web server on secure host:port %s:%s", name, host == null ? "default" : host, - securePort)); + log.info(String.format("About to start %s web server on %s:%s:%s", name, host == null ? "default" : host, + securePort, protocolName)); } return connectors.toArray(new Connector[connectors.size()]); } @@ -502,7 +523,15 @@ public void setHttpsEnabled(boolean httpsEnabled) { public boolean isHttpsEnabled() { return httpsEnabled; } - + + public void setHttps2Enabled(boolean https2Enabled) { + this.https2Enabled = https2Enabled; + } + + public boolean isHttps2Enabled() { + return https2Enabled; + } + protected Class loadRemoteStatusEndpoint() { try { Class clazz = ClassUtils.getClass("com.jumpmind.symmetric.console.remote.ServerEndpoint"); diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/PushUriHandler.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/PushUriHandler.java index 08c2eb7671..8aa912b962 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/PushUriHandler.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/PushUriHandler.java @@ -102,7 +102,8 @@ protected int push(String sourceNodeId, String channelId, InputStream inputStrea protected InputStream createInputStream(HttpServletRequest req) throws IOException { InputStream is = null; String contentType = req.getHeader("Content-Type"); - boolean useCompression = contentType != null && contentType.equalsIgnoreCase("gzip"); + boolean useCompression = contentType != null && (contentType.equalsIgnoreCase("gzip") + || contentType.equalsIgnoreCase("application/gzip")); is = req.getInputStream(); if (useCompression) { is = new GZIPInputStream(is); diff --git a/symmetric-util/src/main/java/org/jumpmind/util/AppUtils.java b/symmetric-util/src/main/java/org/jumpmind/util/AppUtils.java index 1f82945d8a..e3e669ed62 100644 --- a/symmetric-util/src/main/java/org/jumpmind/util/AppUtils.java +++ b/symmetric-util/src/main/java/org/jumpmind/util/AppUtils.java @@ -111,16 +111,25 @@ public static String getHostName() { } public static String getPortNumber() { - String portNumber = System.getProperty(SYSPROP_PORT_NUMBER, - System.getProperty("http.port", System.getProperty("https.port", UNKNOWN))); - if (UNKNOWN.equals(portNumber)) { - try { - portNumber = "31415"; - } catch (Exception ex) { - log.warn("", ex); - } + String httpPort = System.getProperty(SYSPROP_PORT_NUMBER, System.getProperty("http.port")); + String port = httpPort == null ? "31415" : httpPort; + String httpsEnable = System.getProperty("https.enable"); + + if (httpsEnable != null && httpsEnable.equalsIgnoreCase("true")) { + String httpsPort = System.getProperty("https.port"); + port = httpsPort == null ? "31417" : httpsPort; + } + return port; + } + + public static String getProtocol() { + String protocol = "http"; + String httpsEnable = System.getProperty("https.enable"); + + if (httpsEnable != null && httpsEnable.equalsIgnoreCase("true")) { + protocol = "https"; } - return portNumber; + return protocol; } public static String getIpAddress() {