Skip to content

Commit

Permalink
fix: Fixes errors and increased latency for pull queries from closing…
Browse files Browse the repository at this point in the history
… connection (#8248)
  • Loading branch information
AlanConfluent committed Oct 12, 2021
1 parent e164b18 commit d98f50a
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ PullQueryResult executeTablePullQuery(
final LogicalPlanNode logicalPlan = buildAndValidateLogicalPlan(
statement, analysis, ksqlConfig, queryPlannerOptions, false);

// This is a cancel signal that is used to stop both local operations and requests
// This is a cancel signal that is used to stop both local operations
final CompletableFuture<Void> shouldCancelRequests = new CompletableFuture<>();

plan = buildPullPhysicalPlan(
Expand All @@ -262,7 +262,7 @@ PullQueryResult executeTablePullQuery(
final PullQueryQueuePopulator populator = () -> routing.handlePullQuery(
serviceContext,
physicalPlan, statement, routingOptions, physicalPlan.getOutputSchema(),
physicalPlan.getQueryId(), pullQueryQueue, shouldCancelRequests);
physicalPlan.getQueryId(), pullQueryQueue);
final PullQueryResult result = new PullQueryResult(physicalPlan.getOutputSchema(), populator,
physicalPlan.getQueryId(), pullQueryQueue, pullQueryMetrics, physicalPlan.getSourceType(),
physicalPlan.getPlanType(), routingNodeType, physicalPlan::getRowsReadFromDataSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ public CompletableFuture<Void> handlePullQuery(
final RoutingOptions routingOptions,
final LogicalSchema outputSchema,
final QueryId queryId,
final PullQueryQueue pullQueryQueue,
final CompletableFuture<Void> shouldCancelRequests
final PullQueryQueue pullQueryQueue
) {
final List<KsqlPartitionLocation> allLocations = pullPhysicalPlan.getMaterialization().locator()
.locate(
Expand Down Expand Up @@ -152,7 +151,7 @@ public CompletableFuture<Void> handlePullQuery(
coordinatorExecutorService.submit(() -> {
try {
executeRounds(serviceContext, pullPhysicalPlan, statement, routingOptions, outputSchema,
queryId, locations, pullQueryQueue, shouldCancelRequests);
queryId, locations, pullQueryQueue);
completableFuture.complete(null);
} catch (Throwable t) {
completableFuture.completeExceptionally(t);
Expand All @@ -170,8 +169,7 @@ private void executeRounds(
final LogicalSchema outputSchema,
final QueryId queryId,
final List<KsqlPartitionLocation> locations,
final PullQueryQueue pullQueryQueue,
final CompletableFuture<Void> shouldCancelRequests
final PullQueryQueue pullQueryQueue
) throws InterruptedException {
// The remaining partition locations to retrieve without error
List<KsqlPartitionLocation> remainingLocations = ImmutableList.copyOf(locations);
Expand Down Expand Up @@ -199,8 +197,7 @@ private void executeRounds(
futures.put(node, routerExecutorService.submit(
() -> routeQuery.routeQuery(
node, entry.getValue(), statement, serviceContext, routingOptions,
pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue,
shouldCancelRequests)
pullQueryMetrics, pullPhysicalPlan, outputSchema, queryId, pullQueryQueue)
));
}

Expand Down Expand Up @@ -271,8 +268,7 @@ RoutingResult routeQuery(
PullPhysicalPlan pullPhysicalPlan,
LogicalSchema outputSchema,
QueryId queryId,
PullQueryQueue pullQueryQueue,
CompletableFuture<Void> shouldCancelRequests
PullQueryQueue pullQueryQueue
);
}

Expand All @@ -288,8 +284,7 @@ static RoutingResult executeOrRouteQuery(
final PullPhysicalPlan pullPhysicalPlan,
final LogicalSchema outputSchema,
final QueryId queryId,
final PullQueryQueue pullQueryQueue,
final CompletableFuture<Void> shouldCancelRequests
final PullQueryQueue pullQueryQueue
) {
final BiFunction<List<?>, LogicalSchema, PullQueryRow> rowFactory = (rawRow, schema) ->
new PullQueryRow(rawRow, schema, Optional.ofNullable(
Expand Down Expand Up @@ -320,7 +315,7 @@ static RoutingResult executeOrRouteQuery(
pullQueryMetrics
.ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordRemoteRequests(1));
forwardTo(node, locations, statement, serviceContext, pullQueryQueue, rowFactory,
outputSchema, shouldCancelRequests);
outputSchema);
return RoutingResult.SUCCESS;
} catch (StandbyFallbackException e) {
LOG.warn("Error forwarding query to node {}. Falling back to standby state which may "
Expand All @@ -342,8 +337,7 @@ private static void forwardTo(
final ServiceContext serviceContext,
final PullQueryQueue pullQueryQueue,
final BiFunction<List<?>, LogicalSchema, PullQueryRow> rowFactory,
final LogicalSchema outputSchema,
final CompletableFuture<Void> shouldCancelRequests
final LogicalSchema outputSchema
) {

// Specify the partitions we specifically want to read. This will prevent reading unintended
Expand All @@ -366,8 +360,7 @@ private static void forwardTo(
statement.getStatementText(),
statement.getSessionConfig().getOverrides(),
requestProperties,
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema),
shouldCancelRequests
streamedRowsHandler(owner, pullQueryQueue, rowFactory, outputSchema)
);
} catch (Exception e) {
// If we threw some explicit exception, then let it bubble up. All of the row handling is
Expand All @@ -376,12 +369,6 @@ private static void forwardTo(
if (ksqlException != null) {
throw ksqlException;
}
// If the exception was caused by closing the connection, we consider this intentional and
// just return.
if (shouldCancelRequests.isDone()) {
LOG.warn("Connection canceled, so returning");
return;
}
// If we get some kind of unknown error, we assume it's network or other error from the
// KsqlClient and try standbys
throw new StandbyFallbackException(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,10 @@
import io.confluent.ksql.util.KsqlHostInfo;
import io.confluent.ksql.util.KsqlRequestConfig;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -116,8 +113,6 @@ public class HARoutingTest {
private KsqlConfig ksqlConfig;
@Mock
private SimpleKsqlClient ksqlClient;
@Mock
private CompletableFuture<Void> disconnect;

private KsqlPartitionLocation location1;
private KsqlPartitionLocation location2;
Expand Down Expand Up @@ -175,7 +170,7 @@ public void shouldCallRouteQuery_success() throws InterruptedException, Executio
queue.acceptRow(PQ_ROW1);
return null;
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location1, location3)), any(), any());
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand All @@ -192,7 +187,7 @@ public void shouldCallRouteQuery_success() throws InterruptedException, Executio
// When:
CompletableFuture<Void> future = haRouting.handlePullQuery(
serviceContext, pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId,
pullQueryQueue, disconnect);
pullQueryQueue);
future.get();

// Then:
Expand All @@ -210,7 +205,7 @@ public void shouldCallRouteQuery_twoRound() throws InterruptedException, Executi
doAnswer(i -> {
throw new StandbyFallbackException("Error!");
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location1, location3)), any(), any());
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any()))
.thenAnswer(new Answer() {
private int count = 0;

Expand Down Expand Up @@ -240,13 +235,12 @@ public Object answer(InvocationOnMock i) {

// When:
CompletableFuture<Void> future = haRouting.handlePullQuery(serviceContext, pullPhysicalPlan,
statement, routingOptions, logicalSchema, queryId, pullQueryQueue, disconnect);
statement, routingOptions, logicalSchema, queryId, pullQueryQueue);
future.get();

// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1, location3)), any(), any());
verify(ksqlClient, times(2)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(),
any());
verify(ksqlClient, times(2)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any());

assertThat(pullQueryQueue.size(), is(2));
assertThat(pullQueryQueue.pollRow(1, TimeUnit.SECONDS).getRow(), is(ROW2));
Expand All @@ -258,7 +252,7 @@ public void shouldCallRouteQuery_twoRound_networkError()
throws InterruptedException, ExecutionException {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any()))
.thenAnswer(i -> {
throw new RuntimeException("Network error!");
}
Expand All @@ -271,12 +265,11 @@ public void shouldCallRouteQuery_twoRound_networkError()

// When:
CompletableFuture<Void> future = haRouting.handlePullQuery(serviceContext, pullPhysicalPlan,
statement, routingOptions, logicalSchema, queryId, pullQueryQueue, disconnect);
statement, routingOptions, logicalSchema, queryId, pullQueryQueue);
future.get();

// Then:
verify(ksqlClient, times(1)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(),
any());
verify(ksqlClient, times(1)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any());
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location2)), any(), any());

assertThat(pullQueryQueue.size(), is(1));
Expand All @@ -290,7 +283,7 @@ public void shouldCallRouteQuery_allStandbysFail() {
doAnswer(i -> {
throw new StandbyFallbackException("Error1!");
}).when(pullPhysicalPlan).execute(eq(ImmutableList.of(location1, location3)), any(), any());
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any()))
.thenAnswer(new Answer() {
private int count = 0;

Expand Down Expand Up @@ -320,16 +313,14 @@ public Object answer(InvocationOnMock i) {
ExecutionException.class,
() -> {
CompletableFuture<Void> future = haRouting.handlePullQuery(serviceContext,
pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId, pullQueryQueue,
disconnect);
pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId, pullQueryQueue);
future.get();
}
);

// Then:
verify(pullPhysicalPlan).execute(eq(ImmutableList.of(location1, location3)), any(), any());
verify(ksqlClient, times(2)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any(),
any());
verify(ksqlClient, times(2)).makeQueryRequest(eq(node2.location()), any(), any(), any(), any());

assertThat(e.getCause().getMessage(), containsString("Exhausted standby hosts to try."));
}
Expand All @@ -344,7 +335,7 @@ public void shouldCallRouteQuery_couldNotFindHost() {
final Exception e = assertThrows(
MaterializationException.class,
() -> haRouting.handlePullQuery(serviceContext, pullPhysicalPlan, statement, routingOptions,
logicalSchema, queryId, pullQueryQueue, disconnect)
logicalSchema, queryId, pullQueryQueue)
);

// Then:
Expand All @@ -363,7 +354,7 @@ public void shouldCallRouteQuery_allFilteredWithCause() {
final Exception e = assertThrows(
MaterializationException.class,
() -> haRouting.handlePullQuery(serviceContext, pullPhysicalPlan, statement, routingOptions,
logicalSchema, queryId, pullQueryQueue, disconnect)
logicalSchema, queryId, pullQueryQueue)
);

// Then:
Expand All @@ -375,7 +366,7 @@ public void shouldCallRouteQuery_allFilteredWithCause() {
public void shouldNotRouteToFilteredHost() throws InterruptedException, ExecutionException {
// Given:
location1 = new PartitionLocation(Optional.empty(), 1, ImmutableList.of(badNode, node1));
when(ksqlClient.makeQueryRequest(any(), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(any(), any(), any(), any(), any()))
.then(invocationOnMock -> RestResponse.successful(200, 2));
locate(location1, location2, location3, location4);

Expand All @@ -387,20 +378,19 @@ public void shouldNotRouteToFilteredHost() throws InterruptedException, Executio
routingOptions,
logicalSchema,
queryId,
pullQueryQueue,
disconnect);
pullQueryQueue);
fut.get();

// Then:
verify(ksqlClient, never())
.makeQueryRequest(eq(badNode.location()), any(), any(), any(), any(), any());
.makeQueryRequest(eq(badNode.location()), any(), any(), any(), any());
}

@Test
public void forwardingError_errorRow() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand All @@ -417,7 +407,7 @@ public void forwardingError_errorRow() {
// When:
CompletableFuture<Void> future = haRouting.handlePullQuery(
serviceContext, pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId,
pullQueryQueue, disconnect);
pullQueryQueue);
final Exception e = assertThrows(
ExecutionException.class,
future::get
Expand All @@ -432,7 +422,7 @@ public void forwardingError_errorRow() {
public void forwardingError_authError() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand All @@ -446,7 +436,7 @@ public void forwardingError_authError() {
// When:
CompletableFuture<Void> future = haRouting.handlePullQuery(
serviceContext, pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId,
pullQueryQueue, disconnect);
pullQueryQueue);
final Exception e = assertThrows(
ExecutionException.class,
future::get
Expand All @@ -461,7 +451,7 @@ public void forwardingError_authError() {
public void forwardingError_noRows() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand All @@ -475,7 +465,7 @@ public void forwardingError_noRows() {
// When:
CompletableFuture<Void> future = haRouting.handlePullQuery(
serviceContext, pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId,
pullQueryQueue, disconnect);
pullQueryQueue);
final Exception e = assertThrows(
ExecutionException.class,
future::get
Expand All @@ -491,7 +481,7 @@ public void forwardingError_noRows() {
public void forwardingError_invalidSchema() {
// Given:
locate(location2);
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any(), any()))
when(ksqlClient.makeQueryRequest(eq(node2.location()), any(), any(), any(), any()))
.thenAnswer(i -> {
Map<String, ?> requestProperties = i.getArgument(3);
Consumer<List<StreamedRow>> rowConsumer = i.getArgument(4);
Expand All @@ -508,7 +498,7 @@ public void forwardingError_invalidSchema() {
// When:
CompletableFuture<Void> future = haRouting.handlePullQuery(
serviceContext, pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId,
pullQueryQueue, disconnect);
pullQueryQueue);
final Exception e = assertThrows(
ExecutionException.class,
future::get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public RestResponse<Integer> makeQueryRequest(
final String sql,
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final Consumer<List<StreamedRow>> rowConsumer,
final CompletableFuture<Void> shouldCloseConnection
final Consumer<List<StreamedRow>> rowConsumer
) {
throw new UnsupportedOperationException("KSQL client is disabled");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ RestResponse<Integer> makeQueryRequest(
String sql,
Map<String, ?> configOverrides,
Map<String, ?> requestProperties,
Consumer<List<StreamedRow>> rowConsumer,
CompletableFuture<Void> shouldCloseConnection
Consumer<List<StreamedRow>> rowConsumer
);

/**
* Send query request to remote Ksql server. This method is similar to
* {@link #makeQueryRequest(URI, String, Map, Map, Consumer, CompletableFuture)}, but gives a
* {@link #makeQueryRequest(URI, String, Map, Map, Consumer)}, but gives a
* different API.
* First, this is run asynchronously and second, when a response is received, a publisher is
* returned which publishes results asynchronously as they become available.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,14 @@ public RestResponse<Integer> makeQueryRequest(
final String sql,
final Map<String, ?> configOverrides,
final Map<String, ?> requestProperties,
final Consumer<List<StreamedRow>> rowConsumer,
final CompletableFuture<Void> shouldCloseConnection
final Consumer<List<StreamedRow>> rowConsumer
) {
final KsqlTarget target = sharedClient
.target(serverEndPoint)
.properties(configOverrides);

final RestResponse<Integer> resp = getTarget(target, authHeader)
.postQueryRequest(sql, requestProperties, Optional.empty(), rowConsumer,
shouldCloseConnection);
.postQueryRequest(sql, requestProperties, Optional.empty(), rowConsumer);

if (resp.isErroneous()) {
return RestResponse.erroneous(resp.getStatusCode(), resp.getErrorMessage());
Expand Down
Loading

0 comments on commit d98f50a

Please sign in to comment.