Skip to content

Commit

Permalink
Harden remote exchange flow (ESQL-1339)
Browse files Browse the repository at this point in the history
Today, we initiate the computation on a remote node by concurrently
sending the data-node request and fetch-page requests. The aims to
minimize latency of ESQL requests by avoiding round trips. However, this
approach proves to be overly complex and heroic. It requires us to
handle pending fetch-page requests that arrive before the data-node
request. Additionally, we also need to leave the already-completed sink
on the remote node for some time to serve on-the-fly fetch-page
requests; otherwise, the requests won't be returned until they time out.

To address these issues, this pull request proposes a simpler flow. For
each computation on a remote node, we will first send a request to open
an exchange. Upon receiving the response, we will concurrently send both
the data-node requests and fetch-pages. This approach eliminates the
above problems.

Furthermore, I have added a disruption test (see EsqlDisruptionIT.java)
to ensure that ESQL doesn't produce unexpected results or resource leaks
when the cluster experiences issues, such as a network partition.
  • Loading branch information
dnhatn committed Jul 7, 2023
1 parent 1f1d57b commit 188b0a7
Show file tree
Hide file tree
Showing 15 changed files with 505 additions and 377 deletions.
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.operator.exchange;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Operator;
Expand All @@ -28,6 +29,8 @@ final class ExchangeBuffer {
private final Object notFullLock = new Object();
private ListenableActionFuture<Void> notFullFuture = null;

private final ListenableActionFuture<Void> completionFuture = new ListenableActionFuture<>();

private volatile boolean noMoreInputs = false;

ExchangeBuffer(int maxSize) {
Expand All @@ -38,11 +41,9 @@ final class ExchangeBuffer {
}

void addPage(Page page) {
if (noMoreInputs == false) {
queue.add(page);
if (queueSize.incrementAndGet() == 1) {
notifyNotEmpty();
}
queue.add(page);
if (queueSize.incrementAndGet() == 1) {
notifyNotEmpty();
}
}

Expand All @@ -51,6 +52,9 @@ Page pollPage() {
if (page != null && queueSize.decrementAndGet() == maxSize - 1) {
notifyNotFull();
}
if (page == null && noMoreInputs && queueSize.get() == 0) {
completionFuture.onResponse(null);
}
return page;
}

Expand Down Expand Up @@ -115,10 +119,13 @@ void finish(boolean drainingPages) {
}
}
notifyNotEmpty();
if (drainingPages || queueSize.get() == 0) {
completionFuture.onResponse(null);
}
}

boolean isFinished() {
return noMoreInputs && queueSize.get() == 0;
return completionFuture.isDone();
}

boolean noMoreInputs() {
Expand All @@ -128,4 +135,11 @@ boolean noMoreInputs() {
int size() {
return queueSize.get();
}

/**
* Adds a listener that will be notified when this exchange buffer is finished.
*/
void addCompletionListener(ActionListener<Void> listener) {
completionFuture.addListener(listener);
}
}

Large diffs are not rendered by default.

Expand Up @@ -15,6 +15,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;

/**
* An {@link ExchangeSinkHandler} receives pages and status from its {@link ExchangeSink}s, which are created using
Expand All @@ -26,20 +28,30 @@
* @see ExchangeSourceHandler
*/
public final class ExchangeSinkHandler {

private final ExchangeBuffer buffer;
private final Queue<ActionListener<ExchangeResponse>> listeners = new ConcurrentLinkedQueue<>();
private final AtomicInteger outstandingSinks = new AtomicInteger();
// listeners are notified by only one thread.
private final Semaphore promised = new Semaphore(1);

public ExchangeSinkHandler(int maxBufferSize) {
private final ListenableActionFuture<Void> completionFuture;
private final LongSupplier nowInMillis;
private final AtomicLong lastUpdatedInMillis;

public ExchangeSinkHandler(int maxBufferSize, LongSupplier nowInMillis) {
this.buffer = new ExchangeBuffer(maxBufferSize);
this.completionFuture = new ListenableActionFuture<>();
this.buffer.addCompletionListener(completionFuture);
this.nowInMillis = nowInMillis;
this.lastUpdatedInMillis = new AtomicLong(nowInMillis.getAsLong());
}

private class LocalExchangeSink implements ExchangeSink {
boolean finished;

LocalExchangeSink() {
onChanged();
outstandingSinks.incrementAndGet();
}

Expand All @@ -53,6 +65,7 @@ public void addPage(Page page) {
public void finish() {
if (finished == false) {
finished = true;
onChanged();
if (outstandingSinks.decrementAndGet() == 0) {
buffer.finish(false);
notifyListeners();
Expand All @@ -62,7 +75,7 @@ public void finish() {

@Override
public boolean isFinished() {
return finished || buffer.noMoreInputs();
return finished || buffer.isFinished();
}

@Override
Expand All @@ -84,6 +97,28 @@ public void fetchPageAsync(boolean sourceFinished, ActionListener<ExchangeRespon
buffer.finish(true);
}
listeners.add(listener);
onChanged();
notifyListeners();
}

/**
* Add a listener, which will be notified when this exchange sink handler is completed. An exchange sink
* handler is consider completed when all associated sinks are completed and the output pages are fetched.
*/
public void addCompletionListener(ActionListener<Void> listener) {
completionFuture.addListener(listener);
}

boolean isFinished() {
return completionFuture.isDone();
}

/**
* Fails this sink exchange handler
*/
void onFailure(Exception failure) {
completionFuture.onFailure(failure);
buffer.finish(true);
notifyListeners();
}

Expand All @@ -104,6 +139,7 @@ private void notifyListeners() {
} finally {
promised.release();
}
onChanged();
listener.onResponse(response);
}
}
Expand All @@ -117,8 +153,30 @@ public ExchangeSink createExchangeSink() {
return new LocalExchangeSink();
}

public void finish() {
buffer.finish(false);
notifyListeners();
/**
* Whether this sink handler has sinks attached or available pages
*/
boolean hasData() {
return outstandingSinks.get() > 0 || buffer.size() > 0;
}

/**
* Whether this sink handler has listeners waiting for data
*/
boolean hasListeners() {
return listeners.isEmpty() == false;
}

private void onChanged() {
lastUpdatedInMillis.accumulateAndGet(nowInMillis.getAsLong(), Math::max);
}

/**
* The time in millis when this sink handler was updated. This timestamp is used to prune idle sinks.
*
* @see ExchangeService#INACTIVE_SINKS_INTERVAL_SETTING
*/
long lastUpdatedTimeInMillis() {
return lastUpdatedInMillis.get();
}
}
Expand Up @@ -64,7 +64,7 @@ public Page pollPage() {
@Override
public boolean isFinished() {
checkFailure();
return buffer.isFinished();
return finished || buffer.isFinished();
}

@Override
Expand Down
Expand Up @@ -10,10 +10,6 @@
import org.elasticsearch.action.ActionListener;

public interface RemoteSink {
void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener);

/**
* An empty remote sink, always responding as if it has completed.
*/
RemoteSink EMPTY = (allSourcesFinished, listener) -> listener.onResponse(new ExchangeResponse(null, true));
void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener);
}
Expand Up @@ -197,7 +197,7 @@ protected void start(Driver driver, ActionListener<Void> listener) {
List<Driver> createDriversForInput(BigArrays bigArrays, List<Page> input, List<Page> results, boolean throwingOp) {
Collection<List<Page>> splitInput = randomSplits(input, randomIntBetween(2, 4));

ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(randomIntBetween(2, 10));
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(randomIntBetween(2, 10), threadPool::relativeTimeInMillis);
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(
randomIntBetween(1, 4),
threadPool.executor("esql_test_executor")
Expand Down
Expand Up @@ -79,7 +79,7 @@ public void testBasic() throws Exception {
for (int i = 0; i < pages.length; i++) {
pages[i] = new Page(new ConstantIntVector(i, 2).asBlock());
}
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(2);
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(2, threadPool::relativeTimeInMillis);
ExchangeSink sink1 = sinkExchanger.createExchangeSink();
ExchangeSink sink2 = sinkExchanger.createExchangeSink();
ExchangeSourceHandler sourceExchanger = new ExchangeSourceHandler(3, threadPool.executor("esql_test_executor"));
Expand Down Expand Up @@ -294,7 +294,7 @@ public void testConcurrentWithHandlers() {
if (sinkHandlers.isEmpty() == false && randomBoolean()) {
sinkHandler = randomFrom(sinkHandlers);
} else {
sinkHandler = new ExchangeSinkHandler(randomExchangeBuffer());
sinkHandler = new ExchangeSinkHandler(randomExchangeBuffer(), threadPool::relativeTimeInMillis);
sourceExchanger.addRemoteSink(sinkHandler::fetchPageAsync, randomIntBetween(1, 3));
sinkHandlers.add(sinkHandler);
}
Expand All @@ -309,7 +309,7 @@ public void testEarlyTerminate() {
IntBlock block = new ConstantIntVector(1, 2).asBlock();
Page p1 = new Page(block);
Page p2 = new Page(block);
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(2);
ExchangeSinkHandler sinkExchanger = new ExchangeSinkHandler(2, threadPool::relativeTimeInMillis);
ExchangeSink sink = sinkExchanger.createExchangeSink();
sink.addPage(p1);
sink.addPage(p2);
Expand Down Expand Up @@ -397,100 +397,6 @@ public void sendResponse(TransportResponse response) throws IOException {
}
}

public void testTimeoutExchangeRequest() throws Exception {
int inactiveTimeoutInMillis = between(1, 100);
Settings settings = Settings.builder()
.put(ExchangeService.INACTIVE_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(inactiveTimeoutInMillis))
.build();
MockTransportService node0 = newTransportService();
ExchangeService exchange0 = new ExchangeService(settings, threadPool);
exchange0.registerTransportHandler(node0);
MockTransportService node1 = newTransportService();
ExchangeService exchange1 = new ExchangeService(settings, threadPool);
exchange1.registerTransportHandler(node1);
AbstractSimpleTransportTestCase.connectToNode(node0, node1.getLocalNode());
// exchange source will retry the timed out response
CountDownLatch latch = new CountDownLatch(between(1, 5));
node1.addRequestHandlingBehavior(ExchangeService.EXCHANGE_ACTION_NAME, new StubbableTransport.RequestHandlingBehavior<>() {
@Override
public void messageReceived(
TransportRequestHandler<TransportRequest> handler,
TransportRequest request,
TransportChannel channel,
Task task
) throws Exception {
handler.messageReceived(request, new FilterTransportChannel(channel) {
@Override
public void sendResponse(TransportResponse response) throws IOException {
latch.countDown();
super.sendResponse(response);
}

@Override
public void sendResponse(Exception exception) throws IOException {
latch.countDown();
super.sendResponse(exception);
}
}, task);
}
});
try (exchange0; exchange1; node0; node1) {
String exchangeId = "exchange";
Task task = new Task(1, "", "", "", null, Collections.emptyMap());
final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000);
PlainActionFuture<Void> collectorFuture = new PlainActionFuture<>();
{
final int maxOutputSeqNo = randomIntBetween(1, 50_000);
SeqNoCollector seqNoCollector = new SeqNoCollector(maxOutputSeqNo);
ExchangeSourceHandler sourceHandler = exchange0.createSourceHandler(
exchangeId,
randomIntBetween(1, 128),
"esql_test_executor"
);
sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, node1.getLocalNode()), randomIntBetween(1, 5));
int numSources = randomIntBetween(1, 10);
List<Driver> sourceDrivers = new ArrayList<>(numSources);
for (int i = 0; i < numSources; i++) {
String description = "source-" + i;
ExchangeSourceOperator sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource());
DriverContext dc = new DriverContext();
Driver d = new Driver(description, dc, () -> description, sourceOperator, List.of(), seqNoCollector.get(dc), () -> {});
sourceDrivers.add(d);
}
new DriverRunner() {
@Override
protected void start(Driver driver, ActionListener<Void> listener) {
Driver.start(threadPool.executor("esql_test_executor"), driver, listener);
}
}.runToCompletion(sourceDrivers, collectorFuture);
}
// Verify that some exchange requests are timed out because we don't have the exchange sink handler yet
assertTrue(latch.await(10, TimeUnit.SECONDS));
PlainActionFuture<Void> generatorFuture = new PlainActionFuture<>();
{
SeqNoGenerator seqNoGenerator = new SeqNoGenerator(maxInputSeqNo);
int numSinks = randomIntBetween(1, 10);
ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomIntBetween(1, 128));
List<Driver> sinkDrivers = new ArrayList<>(numSinks);
for (int i = 0; i < numSinks; i++) {
String description = "sink-" + i;
ExchangeSinkOperator sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink());
DriverContext dc = new DriverContext();
Driver d = new Driver(description, dc, () -> description, seqNoGenerator.get(dc), List.of(), sinkOperator, () -> {});
sinkDrivers.add(d);
}
new DriverRunner() {
@Override
protected void start(Driver driver, ActionListener<Void> listener) {
Driver.start(threadPool.executor("esql_test_executor"), driver, listener);
}
}.runToCompletion(sinkDrivers, generatorFuture);
}
generatorFuture.actionGet(1, TimeUnit.MINUTES);
collectorFuture.actionGet(1, TimeUnit.MINUTES);
}
}

private MockTransportService newTransportService() {
List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>(ClusterModule.getNamedWriteables());
namedWriteables.addAll(Block.getNamedWriteables());
Expand Down

0 comments on commit 188b0a7

Please sign in to comment.