Skip to content

Commit

Permalink
fix: Ensures background timer completes for scalable push queries (#8132
Browse files Browse the repository at this point in the history
)

* fix: Ensures background timer completes for scalable push queries
  • Loading branch information
AlanConfluent committed Sep 15, 2021
1 parent 64cc5ce commit 1a1297a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,21 +197,34 @@ private CompletableFuture<PushConnectionsHandle> connectToHosts(
.map(Entry::getKey)
.findFirst()
.orElse(null);
for (KsqlNode n : hosts) {
final CompletableFuture<RoutingResult> future = futureMap.get(n);
// For simplicity, just close all of the connections that may have succeeded
if (future.isDone() && !future.isCompletedExceptionally()) {
final RoutingResult routingResult = future.join();
routingResult.close();
}
// Mark these all as failed
pushConnectionsHandle.get(n)
.ifPresent(result -> result.updateStatus(RoutingResultStatus.FAILED));
}
LOG.warn("Error routing query {} id {} to host {} at timestamp {} with exception {}",
statement.getStatementText(), pushPhysicalPlan.getQueryId(), node,
System.currentTimeMillis(), t.getCause());

// We only fail the whole thing if this is not a new dynamically added node. We allow
// retries in that case and don't fail the original request.
pushConnectionsHandle.get(node)
.ifPresent(result -> result.updateStatus(RoutingResultStatus.FAILED));
if (!dynamicallyAddedNode) {
pushConnectionsHandle.completeExceptionally(
new KsqlException(String.format(
"Unable to execute push query \"%s\". %s",
statement.getStatementText(), t.getCause().getMessage())));
}
return pushConnectionsHandle;
})
.exceptionally(t -> {
LOG.error("Unexpected error handing exception", t);
return pushConnectionsHandle;
});
}

Expand Down Expand Up @@ -594,6 +607,7 @@ synchronized void close() {
public static class PushConnectionsHandle {
private final Map<KsqlNode, RoutingResult> results = new ConcurrentHashMap<>();
private final CompletableFuture<Void> errorCallback;
private volatile boolean closed = false;

@SuppressFBWarnings(value = "EI_EXPOSE_REP")
public PushConnectionsHandle() {
Expand All @@ -608,6 +622,10 @@ public PushConnectionsHandle() {

public void add(final KsqlNode ksqlNode, final RoutingResult result) {
results.put(ksqlNode, result);
// Make sure that we close the result if the handle has been closed.
if (isClosed()) {
result.close();
}
}

public RoutingResult remove(final KsqlNode ksqlNode) {
Expand All @@ -619,6 +637,7 @@ public Optional<RoutingResult> get(final KsqlNode ksqlNode) {
}

public void close() {
closed = true;
for (RoutingResult result : results.values()) {
result.close();
}
Expand All @@ -636,7 +655,7 @@ public Set<KsqlNode> getActiveHosts() {
}

public boolean isClosed() {
return errorCallback.isDone();
return closed || errorCallback.isDone();
}

public void onException(final Consumer<Throwable> consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -80,6 +81,8 @@ public class PushRoutingTest {
@Mock
private KsqlNode ksqlNodeRemote;
@Mock
private KsqlNode ksqlNodeRemote2;
@Mock
private TransientQueryQueue transientQueryQueueMock;

private Vertx vertx;
Expand All @@ -103,6 +106,8 @@ public void setUp() {
when(ksqlNodeLocal.isLocal()).thenReturn(true);
when(ksqlNodeRemote.location()).thenReturn(URI.create("http://remote:8088"));
when(ksqlNodeRemote.isLocal()).thenReturn(false);
when(ksqlNodeRemote2.location()).thenReturn(URI.create("http://remote2:8088"));
when(ksqlNodeRemote2.isLocal()).thenReturn(false);
when(pushRoutingOptions.getHasBeenForwarded()).thenReturn(false);

transientQueryQueue = new TransientQueryQueue(OptionalInt.empty());
Expand Down Expand Up @@ -139,7 +144,7 @@ public void shouldSucceed_forward() throws ExecutionException, InterruptedExcept
CompletableFuture<PushConnectionsHandle> future =
routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions,
outputSchema, transientQueryQueue);
future.get();
final PushConnectionsHandle handle = future.get();
context.runOnContext(v -> {
localPublisher.accept(LOCAL_ROW1);
localPublisher.accept(LOCAL_ROW2);
Expand All @@ -157,6 +162,7 @@ public void shouldSucceed_forward() throws ExecutionException, InterruptedExcept
}
rows.add(kv.value().values());
}
handle.close();
assertThat(rows.contains(LOCAL_ROW1), is(true));
assertThat(rows.contains(LOCAL_ROW2), is(true));
assertThat(rows.contains(REMOTE_ROW1.getRow().get().getColumns()), is(true));
Expand All @@ -179,7 +185,7 @@ public void shouldSucceed_addRemoteNode() throws ExecutionException, Interrupted
CompletableFuture<PushConnectionsHandle> future =
routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions,
outputSchema, transientQueryQueue);
future.get();
final PushConnectionsHandle handle = future.get();
context.runOnContext(v -> {
localPublisher.accept(LOCAL_ROW1);
localPublisher.accept(LOCAL_ROW2);
Expand Down Expand Up @@ -210,6 +216,7 @@ public void shouldSucceed_addRemoteNode() throws ExecutionException, Interrupted
}
rows.add(kv.value().values());
}
handle.close();
assertThat(rows.contains(LOCAL_ROW1), is(true));
assertThat(rows.contains(LOCAL_ROW2), is(true));
assertThat(rows.contains(REMOTE_ROW1.getRow().get().getColumns()), is(true));
Expand Down Expand Up @@ -256,6 +263,7 @@ public void shouldSucceed_removeRemoteNode() throws ExecutionException, Interrup
Thread.sleep(100);
continue;
}
handle.close();

// Then:
assertThat(rows.contains(LOCAL_ROW1), is(true));
Expand Down Expand Up @@ -304,6 +312,7 @@ public void shouldSucceed_remoteNodeComplete() throws ExecutionException, Interr
Thread.sleep(100);
continue;
}
handle.close();

// Then:
assertThat(rows.contains(LOCAL_ROW1), is(true));
Expand Down Expand Up @@ -342,6 +351,7 @@ public void shouldFail_remoteNodeException() throws ExecutionException, Interrup
Thread.sleep(100);
continue;
}
handle.close();

// Then:
assertThat(exception.get().getMessage(), containsString("Random error"));
Expand All @@ -360,7 +370,7 @@ public void shouldSucceed_justForwarded() throws ExecutionException, Interrupted
CompletableFuture<PushConnectionsHandle> future =
routing.handlePushQuery(serviceContext, pushPhysicalPlan, statement, pushRoutingOptions,
outputSchema, transientQueryQueue);
future.get();
final PushConnectionsHandle handle = future.get();
context.runOnContext(v -> {
localPublisher.accept(LOCAL_ROW1);
localPublisher.accept(LOCAL_ROW2);
Expand All @@ -377,6 +387,7 @@ public void shouldSucceed_justForwarded() throws ExecutionException, Interrupted
}
rows.add(kv.value().values());
}
handle.close();
assertThat(rows.contains(LOCAL_ROW1), is(true));
assertThat(rows.contains(LOCAL_ROW2), is(true));
}
Expand Down Expand Up @@ -419,10 +430,15 @@ public void shouldFail_non200RemoteCall() throws ExecutionException, Interrupted
@Test
public void shouldFail_errorRemoteCall() throws ExecutionException, InterruptedException {
// Given:
when(locator.locate()).thenReturn(ImmutableList.of(ksqlNodeRemote));
when(locator.locate()).thenReturn(ImmutableList.of(ksqlNodeRemote, ksqlNodeRemote2));
final PushRouting routing = new PushRouting();
when(simpleKsqlClient.makeQueryRequestStreamed(any(), any(), any(), any()))
TestRemotePublisher remotePublisher = new TestRemotePublisher(context);
when(simpleKsqlClient.makeQueryRequestStreamed(
eq(ksqlNodeRemote.location()), any(), any(), any()))
.thenReturn(createErrorFuture(new RuntimeException("Error remote!")));
when(simpleKsqlClient.makeQueryRequestStreamed(
eq(ksqlNodeRemote2.location()), any(), any(), any()))
.thenReturn(createFuture(RestResponse.successful(200, remotePublisher)));

// When:
CompletableFuture<PushConnectionsHandle> future =
Expand All @@ -432,6 +448,7 @@ public void shouldFail_errorRemoteCall() throws ExecutionException, InterruptedE

// Then:
assertThat(handle.getError().getMessage(), containsString("Error remote!"));
assertThat(remotePublisher.isClosed(), is(true));
}

@Test
Expand Down Expand Up @@ -463,6 +480,7 @@ public void shouldFail_hitRequestLimitLocal() throws ExecutionException, Interru
}
rows.add(kv.value().values());
}
handle.close();
assertThat(rows.get(0), is(LOCAL_ROW1));
assertThat(handle.getError().getMessage(), containsString("Hit limit of request queue"));
}
Expand Down Expand Up @@ -497,18 +515,30 @@ public void shouldFail_hitRequestLimitRemote() throws ExecutionException, Interr
}
rows.add(kv.value().values());
}
handle.close();
assertThat(rows.contains(REMOTE_ROW1.getRow().get().getColumns()), is(true));
assertThat(handle.getError().getMessage(), containsString("Hit limit of request queue"));
}

private static class TestRemotePublisher extends BufferedPublisher<StreamedRow> {

private volatile boolean closed = false;

public TestRemotePublisher(Context ctx) {
super(ctx);
}

public void error(final Throwable e) {
sendError(e);
}

public void close() {
closed = true;
super.close();
}

public boolean isClosed() {
return closed;
}
}
}

0 comments on commit 1a1297a

Please sign in to comment.