Skip to content

Commit

Permalink
fix: Set timeout for pull query forwarded requsts to 20s (#9823)
Browse files Browse the repository at this point in the history
* fix: Set timeout for pull query forwarded requsts to 15s

* Ups it to 20s
  • Loading branch information
AlanConfluent committed Mar 17, 2023
1 parent 833e652 commit 7ac28eb
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 26 deletions.
14 changes: 14 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Expand Up @@ -360,6 +360,13 @@ public class KsqlConfig extends AbstractConfig {
= "Enables the use of LIMIT clause in pull queries";
public static final boolean KSQL_QUERY_PULL_LIMIT_CLAUSE_ENABLED_DEFAULT = true;

public static final String KSQL_QUERY_PULL_FORWARDING_TIMEOUT_MS_CONFIG
= "ksql.query.pull.forwarding.timeout.ms";
public static final String KSQL_QUERY_PULL_FORWARDING_TIMEOUT_MS_DOC
= "Pull query forwarding timeout in milliseconds";
public static final long KSQL_QUERY_PULL_FORWARDING_TIMEOUT_MS_DEFAULT =
20000L;

public static final String KSQL_QUERY_PUSH_V2_ENABLED
= "ksql.query.push.v2.enabled";
public static final String KSQL_QUERY_PUSH_V2_ENABLED_DOC =
Expand Down Expand Up @@ -1211,6 +1218,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PULL_LIMIT_CLAUSE_ENABLED_DOC
)
.define(
KSQL_QUERY_PULL_FORWARDING_TIMEOUT_MS_CONFIG,
Type.LONG,
KSQL_QUERY_PULL_FORWARDING_TIMEOUT_MS_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_FORWARDING_TIMEOUT_MS_DOC
)
.define(
KSQL_QUERY_PUSH_V2_ENABLED,
Type.BOOLEAN,
Expand Down
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.rest.entity.LagReportingMessage;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlHostInfo;
import io.vertx.core.Vertx;
import io.vertx.core.net.SocketAddress;
Expand All @@ -52,6 +53,7 @@ final class DefaultKsqlClient implements SimpleKsqlClient {
private final Optional<String> authHeader;
private final KsqlClient sharedClient;
private final boolean ownSharedClient;
private final KsqlConfig ksqlConfig;

@VisibleForTesting
DefaultKsqlClient(final Optional<String> authHeader,
Expand All @@ -60,25 +62,29 @@ final class DefaultKsqlClient implements SimpleKsqlClient {
this(
authHeader,
createInternalClient(toClientProps(clientProps), socketAddressFactory, Vertx.vertx()),
true
true,
new KsqlConfig(clientProps)
);
}

DefaultKsqlClient(
final Optional<String> authHeader,
final KsqlClient sharedClient
final KsqlClient sharedClient,
final KsqlConfig ksqlConfig
) {
this(authHeader, sharedClient, false);
this(authHeader, sharedClient, false, ksqlConfig);
}

DefaultKsqlClient(
final Optional<String> authHeader,
final KsqlClient sharedClient,
final boolean ownSharedClient
final boolean ownSharedClient,
final KsqlConfig ksqlConfig
) {
this.authHeader = requireNonNull(authHeader, "authHeader");
this.sharedClient = requireNonNull(sharedClient, "sharedClient");
this.ownSharedClient = ownSharedClient;
this.ksqlConfig = ksqlConfig;
}

@Override
Expand All @@ -103,7 +109,8 @@ public RestResponse<List<StreamedRow>> makeQueryRequest(

final KsqlTarget target = sharedClient
.target(serverEndPoint)
.properties(configOverrides);
.properties(configOverrides)
.timeout(getQueryTimeout(configOverrides));

final RestResponse<List<StreamedRow>> resp = getTarget(target)
.postQueryRequest(sql, requestProperties, Optional.empty());
Expand All @@ -127,7 +134,8 @@ public RestResponse<Integer> makeQueryRequest(
) {
final KsqlTarget target = sharedClient
.target(serverEndPoint)
.properties(configOverrides);
.properties(configOverrides)
.timeout(getQueryTimeout(configOverrides));

final RestResponse<Integer> resp = getTarget(target)
.postQueryRequest(sql, requestProperties, Optional.empty(), rowConsumer,
Expand Down Expand Up @@ -225,4 +233,11 @@ private static Map<String, String> toClientProps(final Map<String, Object> confi
}
return clientProps;
}

private long getQueryTimeout(final Map<String, ?> configOverrides) {
if (configOverrides.containsKey(KsqlConfig.KSQL_QUERY_PULL_FORWARDING_TIMEOUT_MS_CONFIG)) {
return (Long) configOverrides.get(KsqlConfig.KSQL_QUERY_PULL_FORWARDING_TIMEOUT_MS_CONFIG);
}
return ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_PULL_FORWARDING_TIMEOUT_MS_CONFIG);
}
}
Expand Up @@ -97,7 +97,7 @@ public static ServiceContext create(
kafkaClientSupplier,
srClientFactory,
() -> connectClientFactory.get(authHeader, requestHeaders, userPrincipal),
() -> new DefaultKsqlClient(authHeader, sharedClient)
() -> new DefaultKsqlClient(authHeader, sharedClient, ksqlConfig)
);
}

Expand Down
Expand Up @@ -680,9 +680,10 @@ public Builder withEnabledKsqlClient() {
}

public Builder withNetworkDisruptorInternalKsqlClient(NetworkState networkState) {
internalSimpleKsqlClientFactory = (authHeader, ksqlClient) ->
internalSimpleKsqlClientFactory = (authHeader, additionalProps, ksqlClient) ->
new NetworkDisruptorClient(
TestDefaultKsqlClientFactory.instance(authHeader, ksqlClient), networkState);
TestDefaultKsqlClientFactory.instance(authHeader, additionalProps, ksqlClient),
networkState);
return this;
}

Expand Down
Expand Up @@ -15,9 +15,12 @@

package io.confluent.ksql.rest.server.services;

import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand All @@ -29,7 +32,10 @@
import io.confluent.ksql.rest.client.KsqlTarget;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.KsqlConfig;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -50,15 +56,23 @@ public class DefaultKsqlClientTest {
private KsqlTarget target;
@Mock
private RestResponse<KsqlEntityList> response;
@Mock
private RestResponse<List<StreamedRow>> queryResponse;
@Mock
private KsqlConfig ksqlConfig;
private DefaultKsqlClient client;

@Before
public void setUp() {
client = new DefaultKsqlClient(Optional.of(AUTH_HEADER), sharedClient);
client = new DefaultKsqlClient(Optional.of(AUTH_HEADER), sharedClient, ksqlConfig);

when(sharedClient.target(any())).thenReturn(target);
when(target.authorizationHeader(any())).thenReturn(target);
when(target.properties(any())).thenReturn(target);
when(target.timeout(anyLong())).thenReturn(target);
when(target.postKsqlRequest(any(), any(), any())).thenReturn(response);
when(target.postQueryRequest(any(), any(), any())).thenReturn(queryResponse);
when(queryResponse.getStatusCode()).thenReturn(OK.code());
}

@Test
Expand All @@ -82,7 +96,7 @@ public void shouldSetAuthHeaderOnTarget() {
@Test
public void shouldHandleNoAuthHeader() {
// Given:
client = new DefaultKsqlClient(Optional.empty(), sharedClient);
client = new DefaultKsqlClient(Optional.empty(), sharedClient, ksqlConfig);

// When:
final RestResponse<KsqlEntityList> result = client.makeKsqlRequest(SERVER_ENDPOINT, "Sql", ImmutableMap.of());
Expand All @@ -101,4 +115,20 @@ public void shouldPostRequest() {
verify(target).postKsqlRequest("Sql", ImmutableMap.of(), Optional.empty());
assertThat(result, is(response));
}

@Test
public void shouldSetQueryTimeout() {
// Given:
when(ksqlConfig.getLong(KsqlConfig.KSQL_QUERY_PULL_FORWARDING_TIMEOUT_MS_CONFIG))
.thenReturn(300L);

// When:
final RestResponse<List<StreamedRow>> result = client.makeQueryRequest(SERVER_ENDPOINT, "Sql",
ImmutableMap.of(), ImmutableMap.of());

// Then:
verify(target).postQueryRequest("Sql", ImmutableMap.of(), Optional.empty());
verify(target).timeout(300L);
assertThat(result.getStatusCode(), is(queryResponse.getStatusCode()));
}
}
Expand Up @@ -2,6 +2,7 @@

import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.ksql.util.KsqlConfig;
import io.vertx.core.net.SocketAddress;
import java.util.Map;
import java.util.Optional;
Expand All @@ -27,9 +28,10 @@ public static SimpleKsqlClient instance(Map<String, Object> clientProps,
// With auth and a shared client
public static SimpleKsqlClient instance(
final Optional<String> authHeader,
final Map<String, Object> clientProps,
final KsqlClient sharedClient
) {
return new DefaultKsqlClient(authHeader, sharedClient);
return new DefaultKsqlClient(authHeader, sharedClient, new KsqlConfig(clientProps));
}

}
Expand Up @@ -10,15 +10,18 @@
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;

public class TestRestServiceContextFactory {

public interface InternalSimpleKsqlClientFactory {
SimpleKsqlClient create(Optional<String> authHeader, KsqlClient ksqlClient);
SimpleKsqlClient create(Optional<String> authHeader, Map<String, Object> clientProps,
KsqlClient ksqlClient);
}

public static DefaultServiceContextFactory createDefault(
Expand Down Expand Up @@ -55,7 +58,7 @@ public static UserServiceContextFactory createUser(
Optional.empty(),
false,
CONNECT_REQUEST_TIMEOUT_DEFAULT),
() -> ksqlClientFactory.create(authHeader, sharedClient)
() -> ksqlClientFactory.create(authHeader, ksqlConfig.originals(), sharedClient)
);
};

Expand Down
Expand Up @@ -28,6 +28,7 @@
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.SocketAddress;
import java.net.URI;
Expand Down Expand Up @@ -132,7 +133,7 @@ public KsqlTarget target(final URI server, final Map<String, String> additionalH
final HttpClient client = isUriTls ? httpTlsClient : httpNonTlsClient;
return new KsqlTarget(client,
socketAddressFactory.apply(server.getPort(), server.getHost()), localProperties,
basicAuthHeader, server.getHost(), additionalHeaders);
basicAuthHeader, server.getHost(), additionalHeaders, RequestOptions.DEFAULT_TIMEOUT);
}

public KsqlTarget targetHttp2(final URI server) {
Expand All @@ -141,7 +142,7 @@ public KsqlTarget targetHttp2(final URI server) {
() -> new IllegalStateException("Must provide http2 options to use targetHttp2"));
return new KsqlTarget(client,
socketAddressFactory.apply(server.getPort(), server.getHost()), localProperties,
basicAuthHeader, server.getHost(), Collections.emptyMap());
basicAuthHeader, server.getHost(), Collections.emptyMap(), RequestOptions.DEFAULT_TIMEOUT);
}

@VisibleForTesting
Expand Down
Expand Up @@ -88,6 +88,7 @@ public final class KsqlTarget {
private final Optional<String> authHeader;
private final String host;
private final Map<String, String> additionalHeaders;
private final long timeout;

/**
* Create a KsqlTarget containing all of the connection information required to make a request
Expand All @@ -103,25 +104,33 @@ public final class KsqlTarget {
final LocalProperties localProperties,
final Optional<String> authHeader,
final String host,
final Map<String, String> additionalHeaders
final Map<String, String> additionalHeaders,
final long timeout
) {
this.httpClient = requireNonNull(httpClient, "httpClient");
this.socketAddress = requireNonNull(socketAddress, "socketAddress");
this.localProperties = requireNonNull(localProperties, "localProperties");
this.authHeader = requireNonNull(authHeader, "authHeader");
this.host = host;
this.additionalHeaders = requireNonNull(additionalHeaders, "additionalHeaders");
this.timeout = timeout;
}

public KsqlTarget authorizationHeader(final String authHeader) {
return new KsqlTarget(httpClient, socketAddress, localProperties,
Optional.of(authHeader), host, additionalHeaders);
Optional.of(authHeader), host, additionalHeaders, timeout);
}

public KsqlTarget properties(final Map<String, ?> properties) {
return new KsqlTarget(httpClient, socketAddress,
new LocalProperties(properties),
authHeader, host, additionalHeaders);
authHeader, host, additionalHeaders, timeout);
}

public KsqlTarget timeout(final long timeout) {
return new KsqlTarget(httpClient, socketAddress,
localProperties,
authHeader, host, additionalHeaders, timeout);
}

public RestResponse<ServerInfo> getServerInfo() {
Expand Down Expand Up @@ -475,6 +484,7 @@ private CompletableFuture<ResponseWithBody> execute(
options.setPort(socketAddress.port());
options.setHost(host);
options.setURI(path);
options.setTimeout(timeout);

httpClient.request(options, ar -> {
if (ar.failed()) {
Expand Down

0 comments on commit 7ac28eb

Please sign in to comment.