From 1a1297ab59db0af416a924d4135b93dcc5003561 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg <57688982+AlanConfluent@users.noreply.github.com> Date: Wed, 15 Sep 2021 08:44:27 -0700 Subject: [PATCH] fix: Ensures background timer completes for scalable push queries (#8132) * fix: Ensures background timer completes for scalable push queries --- .../physical/scalablepush/PushRouting.java | 25 ++++++++++-- .../scalablepush/PushRoutingTest.java | 40 ++++++++++++++++--- 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java index 0dc575029a2e..21a47f5f2768 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/scalablepush/PushRouting.java @@ -197,14 +197,23 @@ private CompletableFuture connectToHosts( .map(Entry::getKey) .findFirst() .orElse(null); + for (KsqlNode n : hosts) { + final CompletableFuture future = futureMap.get(n); + // For simplicity, just close all of the connections that may have succeeded + if (future.isDone() && !future.isCompletedExceptionally()) { + final RoutingResult routingResult = future.join(); + routingResult.close(); + } + // Mark these all as failed + pushConnectionsHandle.get(n) + .ifPresent(result -> result.updateStatus(RoutingResultStatus.FAILED)); + } LOG.warn("Error routing query {} id {} to host {} at timestamp {} with exception {}", statement.getStatementText(), pushPhysicalPlan.getQueryId(), node, System.currentTimeMillis(), t.getCause()); // We only fail the whole thing if this is not a new dynamically added node. We allow // retries in that case and don't fail the original request. - pushConnectionsHandle.get(node) - .ifPresent(result -> result.updateStatus(RoutingResultStatus.FAILED)); if (!dynamicallyAddedNode) { pushConnectionsHandle.completeExceptionally( new KsqlException(String.format( @@ -212,6 +221,10 @@ private CompletableFuture connectToHosts( statement.getStatementText(), t.getCause().getMessage()))); } return pushConnectionsHandle; + }) + .exceptionally(t -> { + LOG.error("Unexpected error handing exception", t); + return pushConnectionsHandle; }); } @@ -594,6 +607,7 @@ synchronized void close() { public static class PushConnectionsHandle { private final Map results = new ConcurrentHashMap<>(); private final CompletableFuture errorCallback; + private volatile boolean closed = false; @SuppressFBWarnings(value = "EI_EXPOSE_REP") public PushConnectionsHandle() { @@ -608,6 +622,10 @@ public PushConnectionsHandle() { public void add(final KsqlNode ksqlNode, final RoutingResult result) { results.put(ksqlNode, result); + // Make sure that we close the result if the handle has been closed. + if (isClosed()) { + result.close(); + } } public RoutingResult remove(final KsqlNode ksqlNode) { @@ -619,6 +637,7 @@ public Optional get(final KsqlNode ksqlNode) { } public void close() { + closed = true; for (RoutingResult result : results.values()) { result.close(); } @@ -636,7 +655,7 @@ public Set getActiveHosts() { } public boolean isClosed() { - return errorCallback.isDone(); + return closed || errorCallback.isDone(); } public void onException(final Consumer consumer) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushRoutingTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushRoutingTest.java index c55a23cd106c..72409c34597c 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushRoutingTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/scalablepush/PushRoutingTest.java @@ -4,6 +4,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -80,6 +81,8 @@ public class PushRoutingTest { @Mock private KsqlNode ksqlNodeRemote; @Mock + private KsqlNode ksqlNodeRemote2; + @Mock private TransientQueryQueue transientQueryQueueMock; private Vertx vertx; @@ -103,6 +106,8 @@ public void setUp() { when(ksqlNodeLocal.isLocal()).thenReturn(true); when(ksqlNodeRemote.location()).thenReturn(URI.create("http://remote:8088")); when(ksqlNodeRemote.isLocal()).thenReturn(false); + when(ksqlNodeRemote2.location()).thenReturn(URI.create("http://remote2:8088")); + when(ksqlNodeRemote2.isLocal()).thenReturn(false); when(pushRoutingOptions.getHasBeenForwarded()).thenReturn(false); transientQueryQueue = new TransientQueryQueue(OptionalInt.empty()); @@ -139,7 +144,7 @@ public void shouldSucceed_forward() throws ExecutionException, InterruptedExcept CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, outputSchema, transientQueryQueue); - future.get(); + final PushConnectionsHandle handle = future.get(); context.runOnContext(v -> { localPublisher.accept(LOCAL_ROW1); localPublisher.accept(LOCAL_ROW2); @@ -157,6 +162,7 @@ public void shouldSucceed_forward() throws ExecutionException, InterruptedExcept } rows.add(kv.value().values()); } + handle.close(); assertThat(rows.contains(LOCAL_ROW1), is(true)); assertThat(rows.contains(LOCAL_ROW2), is(true)); assertThat(rows.contains(REMOTE_ROW1.getRow().get().getColumns()), is(true)); @@ -179,7 +185,7 @@ public void shouldSucceed_addRemoteNode() throws ExecutionException, Interrupted CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, outputSchema, transientQueryQueue); - future.get(); + final PushConnectionsHandle handle = future.get(); context.runOnContext(v -> { localPublisher.accept(LOCAL_ROW1); localPublisher.accept(LOCAL_ROW2); @@ -210,6 +216,7 @@ public void shouldSucceed_addRemoteNode() throws ExecutionException, Interrupted } rows.add(kv.value().values()); } + handle.close(); assertThat(rows.contains(LOCAL_ROW1), is(true)); assertThat(rows.contains(LOCAL_ROW2), is(true)); assertThat(rows.contains(REMOTE_ROW1.getRow().get().getColumns()), is(true)); @@ -256,6 +263,7 @@ public void shouldSucceed_removeRemoteNode() throws ExecutionException, Interrup Thread.sleep(100); continue; } + handle.close(); // Then: assertThat(rows.contains(LOCAL_ROW1), is(true)); @@ -304,6 +312,7 @@ public void shouldSucceed_remoteNodeComplete() throws ExecutionException, Interr Thread.sleep(100); continue; } + handle.close(); // Then: assertThat(rows.contains(LOCAL_ROW1), is(true)); @@ -342,6 +351,7 @@ public void shouldFail_remoteNodeException() throws ExecutionException, Interrup Thread.sleep(100); continue; } + handle.close(); // Then: assertThat(exception.get().getMessage(), containsString("Random error")); @@ -360,7 +370,7 @@ public void shouldSucceed_justForwarded() throws ExecutionException, Interrupted CompletableFuture future = routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions, outputSchema, transientQueryQueue); - future.get(); + final PushConnectionsHandle handle = future.get(); context.runOnContext(v -> { localPublisher.accept(LOCAL_ROW1); localPublisher.accept(LOCAL_ROW2); @@ -377,6 +387,7 @@ public void shouldSucceed_justForwarded() throws ExecutionException, Interrupted } rows.add(kv.value().values()); } + handle.close(); assertThat(rows.contains(LOCAL_ROW1), is(true)); assertThat(rows.contains(LOCAL_ROW2), is(true)); } @@ -419,10 +430,15 @@ public void shouldFail_non200RemoteCall() throws ExecutionException, Interrupted @Test public void shouldFail_errorRemoteCall() throws ExecutionException, InterruptedException { // Given: - when(locator.locate()).thenReturn(ImmutableList.of(ksqlNodeRemote)); + when(locator.locate()).thenReturn(ImmutableList.of(ksqlNodeRemote, ksqlNodeRemote2)); final PushRouting routing = new PushRouting(); - when(simpleKsqlClient.makeQueryRequestStreamed(any(), any(), any(), any())) + TestRemotePublisher remotePublisher = new TestRemotePublisher(context); + when(simpleKsqlClient.makeQueryRequestStreamed( + eq(ksqlNodeRemote.location()), any(), any(), any())) .thenReturn(createErrorFuture(new RuntimeException("Error remote!"))); + when(simpleKsqlClient.makeQueryRequestStreamed( + eq(ksqlNodeRemote2.location()), any(), any(), any())) + .thenReturn(createFuture(RestResponse.successful(200, remotePublisher))); // When: CompletableFuture future = @@ -432,6 +448,7 @@ public void shouldFail_errorRemoteCall() throws ExecutionException, InterruptedE // Then: assertThat(handle.getError().getMessage(), containsString("Error remote!")); + assertThat(remotePublisher.isClosed(), is(true)); } @Test @@ -463,6 +480,7 @@ public void shouldFail_hitRequestLimitLocal() throws ExecutionException, Interru } rows.add(kv.value().values()); } + handle.close(); assertThat(rows.get(0), is(LOCAL_ROW1)); assertThat(handle.getError().getMessage(), containsString("Hit limit of request queue")); } @@ -497,12 +515,15 @@ public void shouldFail_hitRequestLimitRemote() throws ExecutionException, Interr } rows.add(kv.value().values()); } + handle.close(); assertThat(rows.contains(REMOTE_ROW1.getRow().get().getColumns()), is(true)); assertThat(handle.getError().getMessage(), containsString("Hit limit of request queue")); } private static class TestRemotePublisher extends BufferedPublisher { + private volatile boolean closed = false; + public TestRemotePublisher(Context ctx) { super(ctx); } @@ -510,5 +531,14 @@ public TestRemotePublisher(Context ctx) { public void error(final Throwable e) { sendError(e); } + + public void close() { + closed = true; + super.close(); + } + + public boolean isClosed() { + return closed; + } } }