Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,7 +43,10 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -63,6 +68,9 @@ public class CollectSinkOperatorCoordinator
private DataInputViewStreamWrapper inStream;
private DataOutputViewStreamWrapper outStream;

private final Set<CompletableFuture<CoordinationResponse>> ongoingRequests =
ConcurrentHashMap.newKeySet();

private ExecutorService executorService;

public CollectSinkOperatorCoordinator(int socketTimeout) {
Expand All @@ -79,8 +87,9 @@ public void start() throws Exception {

@Override
public void close() throws Exception {
LOG.info("Closing the CollectSinkOperatorCoordinator.");
this.executorService.shutdownNow();
closeConnection();
this.executorService.shutdown();
}

@Override
Expand All @@ -101,75 +110,82 @@ public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
"Coordination request must be a CollectCoordinationRequest");

CollectCoordinationRequest collectRequest = (CollectCoordinationRequest) request;
CompletableFuture<CoordinationResponse> responseFuture = new CompletableFuture<>();

if (address == null) {
completeWithEmptyResponse(collectRequest, responseFuture);
return responseFuture;
return CompletableFuture.completedFuture(createEmptyResponse(collectRequest));
}

executorService.submit(() -> handleRequestImpl(collectRequest, responseFuture, address));
return responseFuture;
final CompletableFuture<CoordinationResponse> responseFuture =
FutureUtils.supplyAsync(
() -> handleRequestImpl(collectRequest, address), executorService);

ongoingRequests.add(responseFuture);
return responseFuture.handle(
(response, error) -> {
ongoingRequests.remove(responseFuture);

if (response != null) {
return response;
}

// cancelling the future implies that the error handling happens somewhere else
if (!ExceptionUtils.findThrowable(error, CancellationException.class)
.isPresent()) {
// Request failed: Close current connection and send back empty results
// we catch every exception here because the Socket might suddenly become
// null. We don't want the coordinator to fail if the sink fails.
if (LOG.isDebugEnabled()) {
LOG.warn(
"Collect sink coordinator encountered an unexpected error.",
error);
} else {
LOG.warn(
"Collect sink coordinator encounters a {}: {}",
error.getClass().getSimpleName(),
error.getMessage());
}

closeConnection();
}

return createEmptyResponse(collectRequest);
});
}

private void handleRequestImpl(
CollectCoordinationRequest request,
CompletableFuture<CoordinationResponse> responseFuture,
InetSocketAddress sinkAddress) {
private CoordinationResponse handleRequestImpl(
CollectCoordinationRequest request, InetSocketAddress sinkAddress) throws IOException {
if (sinkAddress == null) {
closeConnection();
completeWithEmptyResponse(request, responseFuture);
return;
throw new NullPointerException("No sinkAddress available.");
}

try {
if (socket == null) {
socket = new Socket();
socket.setSoTimeout(socketTimeout);
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);

socket.connect(sinkAddress);
inStream = new DataInputViewStreamWrapper(socket.getInputStream());
outStream = new DataOutputViewStreamWrapper(socket.getOutputStream());
LOG.info("Sink connection established");
}
if (socket == null) {
socket = new Socket();
socket.setSoTimeout(socketTimeout);
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);

// send version and offset to sink server
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding request to sink socket server");
}
request.serialize(outStream);

// fetch back serialized results
if (LOG.isDebugEnabled()) {
LOG.debug("Fetching serialized result from sink socket server");
}
responseFuture.complete(new CollectCoordinationResponse(inStream));
} catch (Exception e) {
// request failed, close current connection and send back empty results
// we catch every exception here because socket might suddenly becomes null if the sink
// fails
// and we do not want the coordinator to fail
if (LOG.isDebugEnabled()) {
// this is normal when sink restarts or job ends, so we print a debug log
LOG.debug("Collect sink coordinator encounters an exception", e);
}
closeConnection();
completeWithEmptyResponse(request, responseFuture);
socket.connect(sinkAddress);
inStream = new DataInputViewStreamWrapper(socket.getInputStream());
outStream = new DataOutputViewStreamWrapper(socket.getOutputStream());
LOG.info("Sink connection established");
}

// send version and offset to sink server
LOG.debug("Forwarding request to sink socket server");
request.serialize(outStream);

// fetch back serialized results
LOG.debug("Fetching serialized result from sink socket server");
return new CollectCoordinationResponse(inStream);
}

private void completeWithEmptyResponse(
CollectCoordinationRequest request, CompletableFuture<CoordinationResponse> future) {
future.complete(
new CollectCoordinationResponse(
request.getVersion(),
// this lastCheckpointedOffset is OK
// because client will only expose results to the users when the
// checkpointed offset increases
-1,
Collections.emptyList()));
private CollectCoordinationResponse createEmptyResponse(CollectCoordinationRequest request) {
return new CollectCoordinationResponse(
request.getVersion(),
// this lastCheckpointedOffset is OK
// because client will only expose results to the users when the
// checkpointed offset increases
-1,
Collections.emptyList());
}

private void closeConnection() {
Expand Down Expand Up @@ -217,6 +233,9 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData
throws Exception {
if (checkpointData == null) {
// restore before any checkpoint completed
LOG.info("Any ongoing requests are cancelled due to a coordinator reset.");
cancelOngoingRequests();

closeConnection();
} else {
ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData);
Expand All @@ -225,6 +244,11 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData
}
}

private void cancelOngoingRequests() {
ongoingRequests.forEach(ft -> ft.cancel(true));
ongoingRequests.clear();
}

/** Provider for {@link CollectSinkOperatorCoordinator}. */
public static class Provider implements OperatorCoordinator.Provider {

Expand Down