Skip to content

Commit

Permalink
Extract HttpProxyConfig from Gateway and reuse it for Http-push conne…
Browse files Browse the repository at this point in the history
…ctions.

Signed-off-by: Yufei Cai <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Oct 9, 2019
1 parent 94aaf3b commit 1752d81
Show file tree
Hide file tree
Showing 16 changed files with 197 additions and 47 deletions.
Expand Up @@ -10,24 +10,30 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.gateway.endpoints.config;
package org.eclipse.ditto.services.base.config.http;

import java.net.InetSocketAddress;
import java.util.Objects;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.services.utils.config.ConfigWithFallback;
import org.eclipse.ditto.services.utils.config.DittoConfigError;

import com.typesafe.config.Config;

import akka.http.javadsl.ClientTransport;
import akka.http.javadsl.model.headers.HttpCredentials;

/**
* This class is the default implementation of the HTTP proxy config.
*/
@Immutable
public final class DefaultHttpProxyConfig implements HttpProxyConfig {

private static final String CONFIG_PATH = "http.proxy";
private static final String HTTP_PROXY_PATH = "http.proxy";
private static final String PROXY_PATH = "proxy";

private final boolean enabled;
private final String hostName;
Expand All @@ -46,13 +52,21 @@ private DefaultHttpProxyConfig(final ConfigWithFallback configWithFallback) {
/**
* Returns an instance of {@code DefaultHttpProxyConfig} based on the settings of the specified Config.
*
* @param config is supposed to provide the settings of the HTTP proxy config at {@value #CONFIG_PATH}.
* @param config is supposed to provide the settings of the HTTP proxy config at {@value #HTTP_PROXY_PATH}.
* @return the instance.
* @throws org.eclipse.ditto.services.utils.config.DittoConfigError if {@code config} is invalid.
*/
public static DefaultHttpProxyConfig of(final Config config) {
public static DefaultHttpProxyConfig ofHttpProxy(final Config config) {
return ofConfigPath(config, HTTP_PROXY_PATH);
}

public static DefaultHttpProxyConfig ofProxy(final Config config) {
return ofConfigPath(config, PROXY_PATH);
}

private static DefaultHttpProxyConfig ofConfigPath(final Config config, final String relativePath) {
return new DefaultHttpProxyConfig(
ConfigWithFallback.newInstance(config, CONFIG_PATH, HttpProxyConfigValue.values()));
ConfigWithFallback.newInstance(config, relativePath, HttpProxyConfigValue.values()));
}

@Override
Expand Down Expand Up @@ -80,6 +94,24 @@ public String getPassword() {
return password;
}

@Override
public ClientTransport toClientTransport() {
final String hostname = this.getHostname();
final int port = this.getPort();
if (hostname.isEmpty() || 0 == port) {
throw new DittoConfigError("When HTTP proxy is enabled via config, at least proxy hostname and port must " +
"be configured as well!");
}
final InetSocketAddress inetSocketAddress = InetSocketAddress.createUnresolved(hostname, port);

final String username = this.getUsername();
final String password = this.getPassword();
if (!username.isEmpty() && !password.isEmpty()) {
return ClientTransport.httpsProxy(inetSocketAddress, HttpCredentials.create(username, password));
}
return ClientTransport.httpsProxy(inetSocketAddress);
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
Expand Down
Expand Up @@ -10,12 +10,14 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.gateway.endpoints.config;
package org.eclipse.ditto.services.base.config.http;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.services.utils.config.KnownConfigValue;

import akka.http.javadsl.ClientTransport;

/**
* Provides configuration settings for the HTTP proxy.
*/
Expand Down Expand Up @@ -57,6 +59,14 @@ public interface HttpProxyConfig {
*/
String getPassword();

/**
* Converts the proxy settings to an Akka HTTP client transport object.
* Does not check whether the proxy is enabled.
*
* @return an Akka HTTP client transport object matching this config.
*/
ClientTransport toClientTransport();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code HttpProxyConfig}.
Expand Down
Expand Up @@ -10,13 +10,13 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.gateway.endpoints.config;
package org.eclipse.ditto.services.base.config.http;

import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf;
import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable;

import org.assertj.core.api.JUnitSoftAssertions;
import org.eclipse.ditto.services.gateway.endpoints.config.HttpProxyConfig.HttpProxyConfigValue;
import org.eclipse.ditto.services.base.config.http.HttpProxyConfig.HttpProxyConfigValue;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -27,7 +27,7 @@
import nl.jqno.equalsverifier.EqualsVerifier;

/**
* Unit test for {@link org.eclipse.ditto.services.gateway.endpoints.config.DefaultHttpProxyConfig}.
* Unit test for {@link org.eclipse.ditto.services.base.config.http.DefaultHttpProxyConfig}.
*/
public final class DefaultHttpProxyConfigTest {

Expand Down Expand Up @@ -55,7 +55,7 @@ public void testHashCodeAndEquals() {

@Test
public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
final DefaultHttpProxyConfig underTest = DefaultHttpProxyConfig.of(ConfigFactory.empty());
final DefaultHttpProxyConfig underTest = DefaultHttpProxyConfig.ofHttpProxy(ConfigFactory.empty());

softly.assertThat(underTest.isEnabled())
.as(HttpProxyConfigValue.ENABLED.getConfigPath())
Expand All @@ -76,7 +76,7 @@ public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {

@Test
public void underTestReturnsValuesOfConfigFile() {
final DefaultHttpProxyConfig underTest = DefaultHttpProxyConfig.of(httpProxyConfig);
final DefaultHttpProxyConfig underTest = DefaultHttpProxyConfig.ofHttpProxy(httpProxyConfig);

softly.assertThat(underTest.isEnabled())
.as(HttpProxyConfigValue.ENABLED.getConfigPath())
Expand Down
9 changes: 9 additions & 0 deletions services/base/src/test/resources/http-proxy-test.conf
@@ -0,0 +1,9 @@
http {
proxy {
enabled = true
hostname = "example.com"
port = 4711
username = "john.frume"
password = "verySecretPW!"
}
}
Expand Up @@ -17,6 +17,8 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.services.base.config.http.DefaultHttpProxyConfig;
import org.eclipse.ditto.services.base.config.http.HttpProxyConfig;
import org.eclipse.ditto.services.utils.config.ConfigWithFallback;
import org.eclipse.ditto.services.utils.config.ScopedConfig;

Expand All @@ -32,10 +34,12 @@ final class DefaultHttpPushConfig implements HttpPushConfig {

private final int maxParallelism;
private final int maxQueueSize;
private final HttpProxyConfig httpProxyConfig;

private DefaultHttpPushConfig(final ScopedConfig config) {
maxParallelism = config.getInt(ConfigValue.MAX_PARALLELISM.getConfigPath());
maxQueueSize = config.getInt(ConfigValue.MAX_QUEUE_SIZE.getConfigPath());
httpProxyConfig = DefaultHttpProxyConfig.ofProxy(config);
}

static DefaultHttpPushConfig of(final Config config) {
Expand All @@ -52,6 +56,11 @@ public int getMaxQueueSize() {
return maxQueueSize;
}

@Override
public HttpProxyConfig getHttpProxyConfig() {
return httpProxyConfig;
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
Expand Down
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.config;

import org.eclipse.ditto.services.base.config.http.HttpProxyConfig;
import org.eclipse.ditto.services.utils.config.KnownConfigValue;

/**
Expand All @@ -29,6 +30,11 @@ public interface HttpPushConfig {
*/
int getMaxQueueSize();

/**
* @return configuration of the proxy for all outgoing HTTP requests.
*/
HttpProxyConfig getHttpProxyConfig();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code HttpPushConfig}.
Expand Down
Expand Up @@ -16,13 +16,17 @@
import java.util.Map;
import java.util.Optional;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.services.base.config.http.HttpProxyConfig;
import org.eclipse.ditto.services.connectivity.messaging.internal.ssl.SSLContextCreator;

import akka.actor.ActorSystem;
import akka.event.LoggingAdapter;
import akka.http.javadsl.ClientTransport;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.ConnectionContext;
import akka.http.javadsl.Http;
Expand Down Expand Up @@ -51,20 +55,28 @@ final class DefaultHttpPushFactory implements HttpPushFactory {
private final int parallelism;
private final SSLContextCreator sslContextCreator;

@Nullable
private final ClientTransport clientTransport;

private DefaultHttpPushFactory(final ConnectionId connectionId, final Uri baseUri, final int parallelism,
final SSLContextCreator sslContextCreator) {
final SSLContextCreator sslContextCreator, @Nullable final HttpProxyConfig httpProxyConfig) {
this.connectionId = connectionId;
this.baseUri = baseUri;
this.parallelism = parallelism;
this.sslContextCreator = sslContextCreator;
if (httpProxyConfig == null) {
clientTransport = null;
} else {
clientTransport = httpProxyConfig.toClientTransport();
}
}

static HttpPushFactory of(final Connection connection) {
static HttpPushFactory of(final Connection connection, @Nullable final HttpProxyConfig httpProxyConfig) {
final ConnectionId connectionId = connection.getId();
final Uri baseUri = Uri.create(connection.getUri());
final int parallelism = parseParallelism(connection.getSpecificConfig());
final SSLContextCreator sslContextCreator = SSLContextCreator.fromConnection(connection, DittoHeaders.empty());
return new DefaultHttpPushFactory(connectionId, baseUri, parallelism, sslContextCreator);
return new DefaultHttpPushFactory(connectionId, baseUri, parallelism, sslContextCreator, httpProxyConfig);
}

@Override
Expand Down Expand Up @@ -128,13 +140,25 @@ private static String determineHttpPath(final HttpPublishTarget httpPublishTarge
}

private ConnectionPoolSettings getConnectionPoolSettings(final ActorSystem system) {
return disambiguateByConnectionId(system, connectionId).withMaxConnections(parallelism);
final ConnectionPoolSettings settings =
disambiguateByConnectionId(system, connectionId).withMaxConnections(parallelism);
return clientTransport == null
? settings
: settings.withTransport(clientTransport);
}

private HttpsConnectionContext getHttpsConnectionContext() {
return ConnectionContext.https(sslContextCreator.withoutClientCertificate());
}

/**
* Create connection pool settings unique for the connection ID but functionally identical for
* identically configured connections.
*
* @param system the actor system that runs this HTTP-push factory.
* @param id the connection ID.
* @return artificially unique connection pool settings.
*/
private static ConnectionPoolSettings disambiguateByConnectionId(final ActorSystem system, final ConnectionId id) {

final ParserSettings parserSettings = ParserSettings.create(system);
Expand Down
Expand Up @@ -46,7 +46,7 @@ public final class HttpPushClientActor extends BaseClientActor {
@SuppressWarnings("unused")
private HttpPushClientActor(final Connection connection, final ConnectivityStatus desiredConnectionStatus) {
super(connection, desiredConnectionStatus, ActorRef.noSender());
factory = HttpPushFactory.of(connection);
factory = HttpPushFactory.of(connection, null);
}

/**
Expand Down
Expand Up @@ -12,7 +12,10 @@
*/
package org.eclipse.ditto.services.connectivity.messaging.httppush;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.services.base.config.http.HttpProxyConfig;

import akka.actor.ActorSystem;
import akka.event.LoggingAdapter;
Expand Down Expand Up @@ -56,9 +59,10 @@ public interface HttpPushFactory {
* with undefined behavior if the connection is not valid.
*
* @param connection the connection.
* @param httpProxyConfig proxy configuration, or null if no proxy is needed.
* @return the HTTP-push-factory.
*/
static HttpPushFactory of(final Connection connection) {
return DefaultHttpPushFactory.of(connection);
static HttpPushFactory of(final Connection connection, @Nullable final HttpProxyConfig httpProxyConfig) {
return DefaultHttpPushFactory.of(connection, httpProxyConfig);
}
}

0 comments on commit 1752d81

Please sign in to comment.