Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-4389: Add keepalive pings and handle retriable exceptions #55

Merged
merged 1 commit into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
*/
package io.debezium.connector.vitess;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
Expand Down Expand Up @@ -106,6 +109,15 @@ public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
.withDescription("Control StopOnReshard VStream flag."
+ " If set true, the old VStream will be stopped after a reshard operation.");

public static final Field KEEPALIVE_INTERVAL_MS = Field.create(VITESS_CONFIG_GROUP_PREFIX + "keepalive.interval.ms")
.withDisplayName("VStream gRPC keepalive interval (ms)")
.withType(Type.LONG)
.withDefault(Long.MAX_VALUE)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Control the interval between periodic gPRC keepalive pings for VStream." +
" Defaults to Long.MAX_VALUE (disabled).");

public static final Field INCLUDE_UNKNOWN_DATATYPES = Field.create("include.unknown.datatypes")
.withDisplayName("Include unknown datatypes")
.withType(Type.BOOLEAN)
Expand All @@ -129,7 +141,8 @@ public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
VTGATE_USER,
VTGATE_PASSWORD,
TABLET_TYPE,
STOP_ON_RESHARD_FLAG)
STOP_ON_RESHARD_FLAG,
KEEPALIVE_INTERVAL_MS)
.events(INCLUDE_UNKNOWN_DATATYPES)
.excluding(SCHEMA_EXCLUDE_LIST, SCHEMA_INCLUDE_LIST)
.create();
Expand Down Expand Up @@ -199,6 +212,10 @@ public boolean getStopOnReshard() {
return getConfig().getBoolean(STOP_ON_RESHARD_FLAG);
}

public Duration getKeepaliveInterval() {
return getConfig().getDuration(KEEPALIVE_INTERVAL_MS, ChronoUnit.MILLIS);
}

public boolean includeUnknownDatatypes() {
return getConfig().getBoolean(INCLUDE_UNKNOWN_DATATYPES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
.build();

// saves the exception in the ChangeEventQueue, later task poll() would throw the exception
errorHandler = new ErrorHandler(VitessConnector.class, connectorConfig.getLogicalName(), queue);
errorHandler = new VitessErrorHandler(connectorConfig.getLogicalName(), queue);

// for metrics
final VitessEventMetadataProvider metadataProvider = new VitessEventMetadataProvider();
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/debezium/connector/vitess/VitessErrorHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.vitess;

import static io.grpc.Status.Code.UNAVAILABLE;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.ErrorHandler;
import io.grpc.StatusRuntimeException;

public class VitessErrorHandler extends ErrorHandler {
public VitessErrorHandler(String logicalName, ChangeEventQueue<?> queue) {
super(VitessConnector.class, logicalName, queue);
}

@Override
protected boolean isRetriable(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
final StatusRuntimeException exception = (StatusRuntimeException) throwable;
return exception.getStatus().getCode().equals(UNAVAILABLE);
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void execute(ChangeEventSourceContext context, VitessPartition partition,
}
if (error.get() != null) {
LOGGER.error("Error during streaming", error.get());
throw new RuntimeException(error.get());
throw error.get();
}
}
catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ private int getNumOfRowEvents(Vtgate.VStreamResponse response) {
.setFlags(vStreamFlags)
.build(),
responseObserver);
LOGGER.info("Started VStream");
}

private VitessGrpc.VitessStub newStub(ManagedChannel channel) {
Expand All @@ -194,6 +195,7 @@ private <T extends AbstractStub<T>> T withCredentials(T stub) {
private ManagedChannel newChannel(String vtgateHost, int vtgatePort) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(vtgateHost, vtgatePort)
.usePlaintext()
.keepAliveTime(config.getKeepaliveInterval().toMillis(), TimeUnit.MILLISECONDS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not super clear to me why the channel get closed before this change - for my own understanding, is the channel closed by grpc because it is idle? I think the default idle timeout was 30 min: https://grpc.github.io/grpc-java/javadoc/io/grpc/ManagedChannelBuilder.html#idleTimeout-long-java.util.concurrent.TimeUnit-

Copy link
Member Author

@shichao-an shichao-an Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After troubleshooting for quite a bit I couldn't make a conclusion. I tried to set idleTimeout to different values and there was no effect at all. I added debug output and found the channel itself is mostly in IDLE state even before the timeout, so this idle setting might be irrelevant. Because there is only one gRPC activity (VStream) and no new gRPCs. I also debugged server-side as well.

The general assumption is that the connection/transport is closed after no active streams for a while. To gRPC this means no pings between peers for a period, so the transport is disconnected. I have searched and found these references:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The links seem to suggest using idealTimeout over keepAliveTime 🤷

I think it's fine to add this option here, the only concern I have here is it requires corresponding setting on the server side as well (e.g., grpc.keepalive_permit_without_calls)

I'm not sure in this case if it worth adding a TODO for it, I will defer to you and others here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, idealTimeout doesn't work in my verification so concluded that it's not related to idleTimeout, because idleTimeout is about "channel", but our problem is with "transport/connection", which we need to tackle with keepalive pings.

As for keepalive_permit_without_calls, I tested without this option and it works. According to this and this:

  • allows keepalive pings to be sent even if there are no calls in flight
  • Sets whether keepalive will be performed when there are no outstanding RPC on a connection

I think we do have a "call" or outstanding RPC, that is, the VStream call itself. Therefore without this option it won't affect our keepalive pings. I will need to test it in prod for several passes once I had the snapshots ready. We can add it if we do need this option.

.build();
return channel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ protected static String incrementGtid(String gtid, int increment) {
return gtid.substring(0, idx) + seq;
}

protected TestConsumer testConsumer(int expectedRecordsCount, String... topicPrefixes) {
return new TestConsumer(expectedRecordsCount, topicPrefixes);
protected TestConsumer testConsumer(int expectedRecordsCount, String... topicPrefixes) throws InterruptedException {
TestConsumer consumer = new TestConsumer(expectedRecordsCount, topicPrefixes);
return consumer;
}

/** Same as io.debezium.connector.postgresql.AbstractRecordsProducerTest.TestConsumer */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,13 @@ private void waitForShardedGtidAcquiring(final LogInterceptor logInterceptor) {
.until(() -> logInterceptor.containsMessage("Default VGTID '[{\"keyspace\":"));
}

private void waitForVStreamStarted(final LogInterceptor logInterceptor) {
// The inserts must happen only after VStream is started with some buffer time.
Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()))
shichao-an marked this conversation as resolved.
Show resolved Hide resolved
.pollInterval(Duration.ofSeconds(1))
.until(() -> logInterceptor.containsMessage("Started VStream"));
}

private void startConnector() throws InterruptedException {
startConnector(false);
}
Expand All @@ -576,9 +583,11 @@ private void startConnector(boolean hasMultipleShards) throws InterruptedExcepti
private void startConnector(Function<Configuration.Builder, Configuration.Builder> customConfig, boolean hasMultipleShards)
throws InterruptedException {
Configuration.Builder configBuilder = customConfig.apply(TestHelper.defaultConfig(hasMultipleShards));
final LogInterceptor logInterceptor = new LogInterceptor();
start(VitessConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning();
waitForVStreamStarted(logInterceptor);
}

private void waitForStreamingRunning() throws InterruptedException {
Expand Down