From 37507ab86542de3352324e3cf341641ef4708b96 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Fri, 7 Jan 2022 14:21:16 -0800 Subject: [PATCH] feat: add support for connect specific https configs (#8553) --- .../io/confluent/ksql/util/KsqlConfig.java | 5 ++ .../ksql/services/DefaultConnectClient.java | 62 +++++++++++++++---- .../services/DefaultConnectClientFactory.java | 37 ++++++++++- .../DefaultConnectClientFactoryTest.java | 42 +++++++++++++ .../services/DefaultConnectClientTest.java | 20 +++++- .../ksql/services/TestServiceContext.java | 5 +- .../ksql/test/tools/TestExecutor.java | 4 +- .../TestRestServiceContextFactory.java | 9 ++- 8 files changed, 167 insertions(+), 17 deletions(-) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 831dbc29f7e2..0089b5ac2675 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -119,6 +119,11 @@ public class KsqlConfig extends AbstractConfig { public static final String BASIC_AUTH_CREDENTIALS_USERNAME = "username"; public static final String BASIC_AUTH_CREDENTIALS_PASSWORD = "password"; + public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = + SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG; + public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_HTTPS = "https"; + public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_NONE = "none"; + public static final String CONNECT_BASIC_AUTH_CREDENTIALS_FILE_PROPERTY = KSQL_CONNECT_PREFIX + "basic.auth.credentials.file"; public static final String CONNECT_BASIC_AUTH_CREDENTIALS_RELOAD_PROPERTY = diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java index 9b407abb2c60..18df90b964d3 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClient.java @@ -38,7 +38,12 @@ import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; +import javax.net.ssl.SSLContext; import org.apache.hc.client5.http.fluent.Request; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpEntity; @@ -46,6 +51,7 @@ import org.apache.hc.core5.http.io.HttpClientResponseHandler; import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.Timeout; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; @@ -75,23 +81,27 @@ public class DefaultConnectClient implements ConnectClient { private final URI connectUri; private final Header[] requestHeaders; + private final CloseableHttpClient httpClient; public DefaultConnectClient( final String connectUri, final Optional authHeader, - final Map additionalRequestHeaders + final Map additionalRequestHeaders, + final Optional sslContext, + final boolean verifySslHostname ) { Objects.requireNonNull(connectUri, "connectUri"); Objects.requireNonNull(authHeader, "authHeader"); Objects.requireNonNull(additionalRequestHeaders, "additionalRequestHeaders"); - this.requestHeaders = buildHeaders(authHeader, additionalRequestHeaders); - + Objects.requireNonNull(sslContext, "sslContext"); try { this.connectUri = new URI(connectUri); } catch (final URISyntaxException e) { throw new KsqlException( "Could not initialize connect client due to invalid URI: " + connectUri, e); } + this.requestHeaders = buildHeaders(authHeader, additionalRequestHeaders); + this.httpClient = buildHttpClient(sslContext, verifySslHostname); } @Override @@ -117,7 +127,7 @@ public ConnectResponse create( "config", config)), ContentType.APPLICATION_JSON ) - .execute() + .execute(httpClient) .handleResponse( createHandler(HttpStatus.SC_CREATED, new TypeReference() {}, Function.identity()))); @@ -147,7 +157,7 @@ public ConnectResponse validate( .responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) .connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) .bodyString(MAPPER.writeValueAsString(config), ContentType.APPLICATION_JSON) - .execute() + .execute(httpClient) .handleResponse( createHandler(HttpStatus.SC_OK, new TypeReference() {}, Function.identity()))); @@ -175,7 +185,7 @@ public ConnectResponse> connectors() { .setHeaders(requestHeaders) .responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) .connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) - .execute() + .execute(httpClient) .handleResponse( createHandler(HttpStatus.SC_OK, new TypeReference>() {}, Function.identity()))); @@ -200,7 +210,7 @@ public ConnectResponse> connectorPlugins() { .setHeaders(requestHeaders) .responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) .connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) - .execute() + .execute(httpClient) .handleResponse( createHandler(HttpStatus.SC_OK, new TypeReference>() {}, Function.identity()))); @@ -226,7 +236,7 @@ public ConnectResponse status(final String connector) { .setHeaders(requestHeaders) .responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) .connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) - .execute() + .execute(httpClient) .handleResponse( createHandler(HttpStatus.SC_OK, new TypeReference() {}, Function.identity()))); @@ -252,7 +262,7 @@ public ConnectResponse describe(final String connector) { .setHeaders(requestHeaders) .responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) .connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) - .execute() + .execute(httpClient) .handleResponse( createHandler(HttpStatus.SC_OK, new TypeReference() {}, Function.identity()))); @@ -278,7 +288,7 @@ public ConnectResponse delete(final String connector) { .setHeaders(requestHeaders) .responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) .connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) - .execute() + .execute(httpClient) .handleResponse( createHandler(HttpStatus.SC_NO_CONTENT, new TypeReference() {}, foo -> connector))); @@ -304,7 +314,7 @@ public ConnectResponse>>> topics(final Stri .setHeaders(requestHeaders) .responseTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) .connectTimeout(Timeout.ofMilliseconds(DEFAULT_TIMEOUT_MS)) - .execute() + .execute(httpClient) .handleResponse( createHandler(HttpStatus.SC_OK, new TypeReference>>>() {}, @@ -362,6 +372,36 @@ private static Header[] buildHeaders( return headers.toArray(new Header[0]); } + /** + * Uses defaults from Request.execute(), except with an explicit SSLSocketFactory to pass + * custom SSL configs. Link to default below: + * https://github.com/apache/httpcomponents-client/blob/3734aaa038a58c17af638e9fa29019cacb22e82c/httpclient5-fluent/src/main/java/org/apache/hc/client5/http/fluent/Executor.java#L62-L72 + */ + private static CloseableHttpClient buildHttpClient( + final Optional sslContext, + final boolean verifySslHostname + ) { + final PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder = + PoolingHttpClientConnectionManagerBuilder.create(); + sslContext.ifPresent(ctx -> { + final SSLConnectionSocketFactory socketFactory = verifySslHostname + ? new SSLConnectionSocketFactory(ctx) + : new SSLConnectionSocketFactory(ctx, (hostname, session) -> true); + connectionManagerBuilder.setSSLSocketFactory(socketFactory); + }); + + return HttpClientBuilder.create() + .setConnectionManager(connectionManagerBuilder + .setMaxConnPerRoute(100) + .setMaxConnTotal(200) + .setValidateAfterInactivity(TimeValue.ofSeconds(10L)) + .build()) + .useSystemProperties() + .evictExpiredConnections() + .evictIdleConnections(TimeValue.ofMinutes(1L)) + .build(); + } + @SuppressWarnings("unchecked") private static ConnectResponse withRetries(final Callable> action) { try { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClientFactory.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClientFactory.java index 49ab9eebbbda..1bb2688659f2 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClientFactory.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/DefaultConnectClientFactory.java @@ -15,6 +15,7 @@ package io.confluent.ksql.services; +import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.connect.ConnectRequestHeadersExtension; import io.confluent.ksql.security.KsqlPrincipal; import io.confluent.ksql.util.FileWatcher; @@ -25,10 +26,13 @@ import java.nio.file.Paths; import java.util.Base64; import java.util.Collections; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; +import javax.net.ssl.SSLContext; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,13 +83,18 @@ public synchronized DefaultConnectClient get( connectAuthHeader = buildAuthHeader(); } + final Map configWithPrefixOverrides = + ksqlConfig.valuesWithPrefixOverride(KsqlConfig.KSQL_CONNECT_PREFIX); + return new DefaultConnectClient( ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY), // if no custom auth is configured, then forward incoming request header isCustomBasicAuthConfigured() ? connectAuthHeader : ksqlAuthHeader, requestHeadersExtension .map(extension -> extension.getHeaders(userPrincipal)) - .orElse(Collections.emptyMap()) + .orElse(Collections.emptyMap()), + Optional.ofNullable(newSslContext(configWithPrefixOverrides)), + shouldVerifySslHostname(configWithPrefixOverrides) ); } @@ -155,4 +164,30 @@ private static Optional buildBasicAuthHeader(final String credentialsPat return Optional.empty(); } } + + private static SSLContext newSslContext(final Map config) { + final DefaultSslEngineFactory sslFactory = new DefaultSslEngineFactory(); + sslFactory.configure(config); + return sslFactory.sslContext(); + } + + @VisibleForTesting + static boolean shouldVerifySslHostname(final Map config) { + final Object endpointIdentificationAlgoConfig = + config.get(KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); + if (endpointIdentificationAlgoConfig == null) { + return false; + } + final String endpointIdentificationAlgo = endpointIdentificationAlgoConfig.toString(); + if (endpointIdentificationAlgo.isEmpty() || endpointIdentificationAlgo + .equalsIgnoreCase(KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_NONE)) { + return false; + } else if (endpointIdentificationAlgo + .equalsIgnoreCase(KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_HTTPS)) { + return true; + } else { + throw new ConfigException("Endpoint identification algorithm not supported: " + + endpointIdentificationAlgo); + } + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientFactoryTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientFactoryTest.java index 7689083e5fc5..dd268052c272 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientFactoryTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientFactoryTest.java @@ -35,6 +35,8 @@ import java.nio.charset.Charset; import java.nio.file.attribute.FileTime; import java.util.Base64; +import java.util.Collections; +import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.hc.core5.http.Header; @@ -64,6 +66,24 @@ public class DefaultConnectClientFactoryTest { private static final String AUTH_HEADER_NAME = HttpHeaders.AUTHORIZATION.toString(); private static final Header[] EMPTY_HEADERS = new Header[]{}; + private static final Map DEFAULT_CONFIGS_WITH_PREFIX_OVERRIDE = + new KsqlConfig(ImmutableMap.of()).valuesWithPrefixOverride(KsqlConfig.KSQL_CONNECT_PREFIX); + private static final Map CONFIGS_WITH_HOSTNAME_VERIFICATION_ENABLED = + new KsqlConfig(ImmutableMap.of( + KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, + KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_HTTPS)) + .valuesWithPrefixOverride(KsqlConfig.KSQL_CONNECT_PREFIX); + private static final Map CONFIGS_WITH_HOSTNAME_VERIFICATION_DISABLED = + new KsqlConfig(ImmutableMap.of( + KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, + KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_NONE)) + .valuesWithPrefixOverride(KsqlConfig.KSQL_CONNECT_PREFIX); + private static final Map CONFIGS_WITH_HOSTNAME_VERIFICATION_EMPTY = + new KsqlConfig(ImmutableMap.of( + KsqlConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, + "")) + .valuesWithPrefixOverride(KsqlConfig.KSQL_CONNECT_PREFIX); + @Rule public TemporaryFolder folder = KsqlTestFolder.temporaryFolder(); @@ -84,6 +104,7 @@ public void setUp() { when(config.getString(KsqlConfig.CONNECT_URL_PROPERTY)).thenReturn("http://localhost:8034"); when(config.getString(KsqlConfig.CONNECT_BASIC_AUTH_CREDENTIALS_SOURCE_PROPERTY)).thenReturn("NONE"); + when(config.valuesWithPrefixOverride(KsqlConfig.KSQL_CONNECT_PREFIX)).thenReturn(DEFAULT_CONFIGS_WITH_PREFIX_OVERRIDE); connectClientFactory = new DefaultConnectClientFactory(config); } @@ -259,6 +280,27 @@ public void shouldPassCustomRequestHeadersInAdditionToAuthHeader() throws Except arrayContaining(header(AUTH_HEADER_NAME, EXPECTED_HEADER), header("header", "value"))); } + @Test + public void shouldEnableHostnameVerification() { + // When / Then: + assertThat(DefaultConnectClientFactory.shouldVerifySslHostname(CONFIGS_WITH_HOSTNAME_VERIFICATION_ENABLED), + is(true)); + } + + @Test + public void shouldDisableHostnameVerification() { + // When / Then: + assertThat(DefaultConnectClientFactory.shouldVerifySslHostname(CONFIGS_WITH_HOSTNAME_VERIFICATION_DISABLED), + is(false)); + } + + @Test + public void shouldDisableHostnameVerificationOnEmptyConfig() { + // When / Then: + assertThat(DefaultConnectClientFactory.shouldVerifySslHostname(CONFIGS_WITH_HOSTNAME_VERIFICATION_EMPTY), + is(false)); + } + private void givenCustomBasicAuthHeader() { when(config.getString(KsqlConfig.CONNECT_BASIC_AUTH_CREDENTIALS_SOURCE_PROPERTY)).thenReturn("FILE"); when(config.getString(KsqlConfig.CONNECT_BASIC_AUTH_CREDENTIALS_FILE_PROPERTY)).thenReturn(credentialsPath); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java index fc7e3b508da9..485d94a29a20 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/DefaultConnectClientTest.java @@ -18,6 +18,7 @@ import static io.vertx.core.http.HttpHeaders.AUTHORIZATION; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.when; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -35,6 +36,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; import org.apache.http.HttpStatus; import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; @@ -53,6 +56,9 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; @RunWith(Parameterized.class) @SuppressWarnings("checkstyle:ClassDataAbstractionCoupling") @@ -96,11 +102,19 @@ public class DefaultConnectClientTest { .dynamicPort() ); + @Rule + public final MockitoRule mockitoRule = MockitoJUnit.rule(); + @Parameterized.Parameters(name = "{1}") public static Collection pathPrefixes() { return ImmutableList.of(new String[]{"", "no path prefix"}, new String[]{"/some/path/prefix", "with path prefix"}); } + @Mock + private SSLContext sslContext; + @Mock + private SSLSocketFactory sslSocketFactory; + private final String pathPrefix; private ConnectClient client; @@ -111,10 +125,14 @@ public DefaultConnectClientTest(final String pathPrefix, final String testName) @Before public void setup() { + when(sslContext.getSocketFactory()).thenReturn(sslSocketFactory); + client = new DefaultConnectClient( "http://localhost:" + wireMockRule.port() + pathPrefix, Optional.of(AUTH_HEADER), - ImmutableMap.of(CUSTOM_HEADER_NAME, CUSTOM_HEADER_VALUE)); + ImmutableMap.of(CUSTOM_HEADER_NAME, CUSTOM_HEADER_VALUE), + Optional.of(sslContext), + false); } @Test diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java index 720cfd44d504..a1231e5a610b 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/TestServiceContext.java @@ -23,6 +23,7 @@ import java.util.Optional; import java.util.function.Supplier; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; @@ -93,7 +94,9 @@ public static ServiceContext create( new DefaultConnectClient( "http://localhost:8083", Optional.empty(), - Collections.emptyMap()), + Collections.emptyMap(), + Optional.empty(), + false), consumerGroupClient ); } diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java index 4d2e8055f030..3c39cb8dc710 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/TestExecutor.java @@ -574,7 +574,9 @@ private static ServiceContext getServiceContext( () -> new DefaultConnectClient( "http://localhost:8083", Optional.empty(), - Collections.emptyMap()), + Collections.emptyMap(), + Optional.empty(), + false), DisabledKsqlClient::instance, new StubKafkaConsumerGroupClient() ); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/TestRestServiceContextFactory.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/TestRestServiceContextFactory.java index 0127ae1dea65..a1b9d43912f9 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/TestRestServiceContextFactory.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/services/TestRestServiceContextFactory.java @@ -10,6 +10,7 @@ import java.util.Collections; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; public class TestRestServiceContextFactory { @@ -44,8 +45,12 @@ public static UserServiceContextFactory createUser( ksqlConfig, kafkaClientSupplier, srClientFactory, - () -> new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY), - authHeader, Collections.emptyMap()), + () -> new DefaultConnectClient( + ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY), + authHeader, + Collections.emptyMap(), + Optional.empty(), + false), () -> ksqlClientFactory.create(authHeader, sharedClient) ); };