Skip to content

Commit

Permalink
feat: add support for connect specific https configs (#8553)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed Jan 7, 2022
1 parent 7e05e41 commit 37507ab
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public class KsqlConfig extends AbstractConfig {
public static final String BASIC_AUTH_CREDENTIALS_USERNAME = "username";
public static final String BASIC_AUTH_CREDENTIALS_PASSWORD = "password";

public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG =
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG;
public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_HTTPS = "https";
public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_NONE = "none";

public static final String CONNECT_BASIC_AUTH_CREDENTIALS_FILE_PROPERTY =
KSQL_CONNECT_PREFIX + "basic.auth.credentials.file";
public static final String CONNECT_BASIC_AUTH_CREDENTIALS_RELOAD_PROPERTY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,20 @@
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.hc.client5.http.fluent.Request;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.io.HttpClientResponseHandler;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
Expand Down Expand Up @@ -75,23 +81,27 @@ public class DefaultConnectClient implements ConnectClient {

private final URI connectUri;
private final Header[] requestHeaders;
private final CloseableHttpClient httpClient;

public DefaultConnectClient(
final String connectUri,
final Optional<String> authHeader,
final Map<String, String> additionalRequestHeaders
final Map<String, String> additionalRequestHeaders,
final Optional<SSLContext> sslContext,
final boolean verifySslHostname
) {
Objects.requireNonNull(connectUri, "connectUri");
Objects.requireNonNull(authHeader, "authHeader");
Objects.requireNonNull(additionalRequestHeaders, "additionalRequestHeaders");
this.requestHeaders = buildHeaders(authHeader, additionalRequestHeaders);

Objects.requireNonNull(sslContext, "sslContext");
try {
this.connectUri = new URI(connectUri);
} catch (final URISyntaxException e) {
throw new KsqlException(
"Could not initialize connect client due to invalid URI: " + connectUri, e);
}
this.requestHeaders = buildHeaders(authHeader, additionalRequestHeaders);
this.httpClient = buildHttpClient(sslContext, verifySslHostname);
}

@Override
Expand All @@ -117,7 +127,7 @@ public ConnectResponse<ConnectorInfo> create(
"config", config)),
ContentType.APPLICATION_JSON
)
.execute()
.execute(httpClient)
.handleResponse(
createHandler(HttpStatus.SC_CREATED, new TypeReference<ConnectorInfo>() {},
Function.identity())));
Expand Down Expand Up @@ -147,7 +157,7 @@ public ConnectResponse<ConfigInfos> validate(
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.bodyString(MAPPER.writeValueAsString(config), ContentType.APPLICATION_JSON)
.execute()
.execute(httpClient)
.handleResponse(
createHandler(HttpStatus.SC_OK, new TypeReference<ConfigInfos>() {},
Function.identity())));
Expand Down Expand Up @@ -175,7 +185,7 @@ public ConnectResponse<List<String>> connectors() {
.setHeaders(requestHeaders)
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.execute()
.execute(httpClient)
.handleResponse(
createHandler(HttpStatus.SC_OK, new TypeReference<List<String>>() {},
Function.identity())));
Expand All @@ -200,7 +210,7 @@ public ConnectResponse<List<ConnectorPluginInfo>> connectorPlugins() {
.setHeaders(requestHeaders)
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.execute()
.execute(httpClient)
.handleResponse(
createHandler(HttpStatus.SC_OK, new TypeReference<List<ConnectorPluginInfo>>() {},
Function.identity())));
Expand All @@ -226,7 +236,7 @@ public ConnectResponse<ConnectorStateInfo> status(final String connector) {
.setHeaders(requestHeaders)
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.execute()
.execute(httpClient)
.handleResponse(
createHandler(HttpStatus.SC_OK, new TypeReference<ConnectorStateInfo>() {},
Function.identity())));
Expand All @@ -252,7 +262,7 @@ public ConnectResponse<ConnectorInfo> describe(final String connector) {
.setHeaders(requestHeaders)
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.execute()
.execute(httpClient)
.handleResponse(
createHandler(HttpStatus.SC_OK, new TypeReference<ConnectorInfo>() {},
Function.identity())));
Expand All @@ -278,7 +288,7 @@ public ConnectResponse<String> delete(final String connector) {
.setHeaders(requestHeaders)
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.execute()
.execute(httpClient)
.handleResponse(
createHandler(HttpStatus.SC_NO_CONTENT, new TypeReference<Object>() {},
foo -> connector)));
Expand All @@ -304,7 +314,7 @@ public ConnectResponse<Map<String, Map<String, List<String>>>> topics(final Stri
.setHeaders(requestHeaders)
.responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS))
.execute()
.execute(httpClient)
.handleResponse(
createHandler(HttpStatus.SC_OK,
new TypeReference<Map<String, Map<String, List<String>>>>() {},
Expand Down Expand Up @@ -362,6 +372,36 @@ private static Header[] buildHeaders(
return headers.toArray(new Header[0]);
}

/**
* Uses defaults from Request.execute(), except with an explicit SSLSocketFactory to pass
* custom SSL configs. Link to default below:
* https://github.com/apache/httpcomponents-client/blob/3734aaa038a58c17af638e9fa29019cacb22e82c/httpclient5-fluent/src/main/java/org/apache/hc/client5/http/fluent/Executor.java#L62-L72
*/
private static CloseableHttpClient buildHttpClient(
final Optional<SSLContext> sslContext,
final boolean verifySslHostname
) {
final PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder =
PoolingHttpClientConnectionManagerBuilder.create();
sslContext.ifPresent(ctx -> {
final SSLConnectionSocketFactory socketFactory = verifySslHostname
? new SSLConnectionSocketFactory(ctx)
: new SSLConnectionSocketFactory(ctx, (hostname, session) -> true);
connectionManagerBuilder.setSSLSocketFactory(socketFactory);
});

return HttpClientBuilder.create()
.setConnectionManager(connectionManagerBuilder
.setMaxConnPerRoute(100)
.setMaxConnTotal(200)
.setValidateAfterInactivity(TimeValue.ofSeconds(10L))
.build())
.useSystemProperties()
.evictExpiredConnections()
.evictIdleConnections(TimeValue.ofMinutes(1L))
.build();
}

@SuppressWarnings("unchecked")
private static <T> ConnectResponse<T> withRetries(final Callable<ConnectResponse<T>> action) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.services;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.connect.ConnectRequestHeadersExtension;
import io.confluent.ksql.security.KsqlPrincipal;
import io.confluent.ksql.util.FileWatcher;
Expand All @@ -25,10 +26,13 @@
import java.nio.file.Paths;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import javax.net.ssl.SSLContext;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,13 +83,18 @@ public synchronized DefaultConnectClient get(
connectAuthHeader = buildAuthHeader();
}

final Map<String, Object> configWithPrefixOverrides =
ksqlConfig.valuesWithPrefixOverride(KsqlConfig.KSQL_CONNECT_PREFIX);

return new DefaultConnectClient(
ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
// if no custom auth is configured, then forward incoming request header
isCustomBasicAuthConfigured() ? connectAuthHeader : ksqlAuthHeader,
requestHeadersExtension
.map(extension -> extension.getHeaders(userPrincipal))
.orElse(Collections.emptyMap())
.orElse(Collections.emptyMap()),
Optional.ofNullable(newSslContext(configWithPrefixOverrides)),
shouldVerifySslHostname(configWithPrefixOverrides)
);
}

Expand Down Expand Up @@ -155,4 +164,30 @@ private static Optional<String> buildBasicAuthHeader(final String credentialsPat
return Optional.empty();
}
}

private static SSLContext newSslContext(final Map<String, Object> config) {
final DefaultSslEngineFactory sslFactory = new DefaultSslEngineFactory();
sslFactory.configure(config);
return sslFactory.sslContext();
}

@VisibleForTesting
static boolean shouldVerifySslHostname(final Map<String, Object> config) {
final Object endpointIdentificationAlgoConfig =
config.get(KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
if (endpointIdentificationAlgoConfig == null) {
return false;
}
final String endpointIdentificationAlgo = endpointIdentificationAlgoConfig.toString();
if (endpointIdentificationAlgo.isEmpty() || endpointIdentificationAlgo
.equalsIgnoreCase(KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_NONE)) {
return false;
} else if (endpointIdentificationAlgo
.equalsIgnoreCase(KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_HTTPS)) {
return true;
} else {
throw new ConfigException("Endpoint identification algorithm not supported: "
+ endpointIdentificationAlgo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.nio.charset.Charset;
import java.nio.file.attribute.FileTime;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.hc.core5.http.Header;
Expand Down Expand Up @@ -64,6 +66,24 @@ public class DefaultConnectClientFactoryTest {
private static final String AUTH_HEADER_NAME = HttpHeaders.AUTHORIZATION.toString();
private static final Header[] EMPTY_HEADERS = new Header[]{};

private static final Map<String, Object> DEFAULT_CONFIGS_WITH_PREFIX_OVERRIDE =
new KsqlConfig(ImmutableMap.of()).valuesWithPrefixOverride(KsqlConfig.KSQL_CONNECT_PREFIX);
private static final Map<String, Object> CONFIGS_WITH_HOSTNAME_VERIFICATION_ENABLED =
new KsqlConfig(ImmutableMap.of(
KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_HTTPS))
.valuesWithPrefixOverride(KsqlConfig.KSQL_CONNECT_PREFIX);
private static final Map<String, Object> CONFIGS_WITH_HOSTNAME_VERIFICATION_DISABLED =
new KsqlConfig(ImmutableMap.of(
KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_NONE))
.valuesWithPrefixOverride(KsqlConfig.KSQL_CONNECT_PREFIX);
private static final Map<String, Object> CONFIGS_WITH_HOSTNAME_VERIFICATION_EMPTY =
new KsqlConfig(ImmutableMap.of(
KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
""))
.valuesWithPrefixOverride(KsqlConfig.KSQL_CONNECT_PREFIX);

@Rule
public TemporaryFolder folder = KsqlTestFolder.temporaryFolder();

Expand All @@ -84,6 +104,7 @@ public void setUp() {

when(config.getString(KsqlConfig.CONNECT_URL_PROPERTY)).thenReturn("http://localhost:8034");
when(config.getString(KsqlConfig.CONNECT_BASIC_AUTH_CREDENTIALS_SOURCE_PROPERTY)).thenReturn("NONE");
when(config.valuesWithPrefixOverride(KsqlConfig.KSQL_CONNECT_PREFIX)).thenReturn(DEFAULT_CONFIGS_WITH_PREFIX_OVERRIDE);

connectClientFactory = new DefaultConnectClientFactory(config);
}
Expand Down Expand Up @@ -259,6 +280,27 @@ public void shouldPassCustomRequestHeadersInAdditionToAuthHeader() throws Except
arrayContaining(header(AUTH_HEADER_NAME, EXPECTED_HEADER), header("header", "value")));
}

@Test
public void shouldEnableHostnameVerification() {
// When / Then:
assertThat(DefaultConnectClientFactory.shouldVerifySslHostname(CONFIGS_WITH_HOSTNAME_VERIFICATION_ENABLED),
is(true));
}

@Test
public void shouldDisableHostnameVerification() {
// When / Then:
assertThat(DefaultConnectClientFactory.shouldVerifySslHostname(CONFIGS_WITH_HOSTNAME_VERIFICATION_DISABLED),
is(false));
}

@Test
public void shouldDisableHostnameVerificationOnEmptyConfig() {
// When / Then:
assertThat(DefaultConnectClientFactory.shouldVerifySslHostname(CONFIGS_WITH_HOSTNAME_VERIFICATION_EMPTY),
is(false));
}

private void givenCustomBasicAuthHeader() {
when(config.getString(KsqlConfig.CONNECT_BASIC_AUTH_CREDENTIALS_SOURCE_PROPERTY)).thenReturn("FILE");
when(config.getString(KsqlConfig.CONNECT_BASIC_AUTH_CREDENTIALS_FILE_PROPERTY)).thenReturn(credentialsPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static io.vertx.core.http.HttpHeaders.AUTHORIZATION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -35,6 +36,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import org.apache.http.HttpStatus;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
Expand All @@ -53,6 +56,9 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

@RunWith(Parameterized.class)
@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling")
Expand Down Expand Up @@ -96,11 +102,19 @@ public class DefaultConnectClientTest {
.dynamicPort()
);

@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();

@Parameterized.Parameters(name = "{1}")
public static Collection<String[]> pathPrefixes() {
return ImmutableList.of(new String[]{"", "no path prefix"}, new String[]{"/some/path/prefix", "with path prefix"});
}

@Mock
private SSLContext sslContext;
@Mock
private SSLSocketFactory sslSocketFactory;

private final String pathPrefix;

private ConnectClient client;
Expand All @@ -111,10 +125,14 @@ public DefaultConnectClientTest(final String pathPrefix, final String testName)

@Before
public void setup() {
when(sslContext.getSocketFactory()).thenReturn(sslSocketFactory);

client = new DefaultConnectClient(
"http://localhost:" + wireMockRule.port() + pathPrefix,
Optional.of(AUTH_HEADER),
ImmutableMap.of(CUSTOM_HEADER_NAME, CUSTOM_HEADER_VALUE));
ImmutableMap.of(CUSTOM_HEADER_NAME, CUSTOM_HEADER_VALUE),
Optional.of(sslContext),
false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;

Expand Down Expand Up @@ -93,7 +94,9 @@ public static ServiceContext create(
new DefaultConnectClient(
"http://localhost:8083",
Optional.empty(),
Collections.emptyMap()),
Collections.emptyMap(),
Optional.empty(),
false),
consumerGroupClient
);
}
Expand Down

0 comments on commit 37507ab

Please sign in to comment.