Skip to content

Commit

Permalink
DBZ-4389: Add keepalive pings and handle retriable exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
shichao-an committed Dec 2, 2021
1 parent 256f553 commit 341cd51
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 5 deletions.
Expand Up @@ -5,6 +5,9 @@
*/
package io.debezium.connector.vitess;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
Expand Down Expand Up @@ -106,6 +109,15 @@ public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
.withDescription("Control StopOnReshard VStream flag."
+ " If set true, the old VStream will be stopped after a reshard operation.");

public static final Field KEEPALIVE_INTERVAL_MS = Field.create(VITESS_CONFIG_GROUP_PREFIX + "keepalive.interval.ms")
.withDisplayName("VStream gRPC keepalive interval (ms)")
.withType(Type.LONG)
.withDefault(Long.MAX_VALUE)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Control the interval between periodic gPRC keepalive pings for VStream." +
" Defaults to Long.MAX_VALUE (disabled).");

public static final Field INCLUDE_UNKNOWN_DATATYPES = Field.create("include.unknown.datatypes")
.withDisplayName("Include unknown datatypes")
.withType(Type.BOOLEAN)
Expand All @@ -129,7 +141,8 @@ public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
VTGATE_USER,
VTGATE_PASSWORD,
TABLET_TYPE,
STOP_ON_RESHARD_FLAG)
STOP_ON_RESHARD_FLAG,
KEEPALIVE_INTERVAL_MS)
.events(INCLUDE_UNKNOWN_DATATYPES)
.excluding(SCHEMA_EXCLUDE_LIST, SCHEMA_INCLUDE_LIST)
.create();
Expand Down Expand Up @@ -199,6 +212,10 @@ public boolean getStopOnReshard() {
return getConfig().getBoolean(STOP_ON_RESHARD_FLAG);
}

public Duration getKeepaliveInterval() {
return getConfig().getDuration(KEEPALIVE_INTERVAL_MS, ChronoUnit.MILLIS);
}

public boolean includeUnknownDatatypes() {
return getConfig().getBoolean(INCLUDE_UNKNOWN_DATATYPES);
}
Expand Down
Expand Up @@ -80,7 +80,7 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
.build();

// saves the exception in the ChangeEventQueue, later task poll() would throw the exception
errorHandler = new ErrorHandler(VitessConnector.class, connectorConfig.getLogicalName(), queue);
errorHandler = new VitessErrorHandler(connectorConfig.getLogicalName(), queue);

// for metrics
final VitessEventMetadataProvider metadataProvider = new VitessEventMetadataProvider();
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/debezium/connector/vitess/VitessErrorHandler.java
@@ -0,0 +1,27 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.vitess;

import static io.grpc.Status.Code.UNAVAILABLE;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.ErrorHandler;
import io.grpc.StatusRuntimeException;

public class VitessErrorHandler extends ErrorHandler {
public VitessErrorHandler(String logicalName, ChangeEventQueue<?> queue) {
super(VitessConnector.class, logicalName, queue);
}

@Override
protected boolean isRetriable(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
final StatusRuntimeException exception = (StatusRuntimeException) throwable;
return exception.getStatus().getCode().equals(UNAVAILABLE);
}
return false;
}
}
Expand Up @@ -71,7 +71,7 @@ public void execute(ChangeEventSourceContext context, VitessPartition partition,
}
if (error.get() != null) {
LOGGER.error("Error during streaming", error.get());
throw new RuntimeException(error.get());
throw error.get();
}
}
catch (Throwable e) {
Expand Down
Expand Up @@ -171,6 +171,7 @@ private int getNumOfRowEvents(Vtgate.VStreamResponse response) {
.setFlags(vStreamFlags)
.build(),
responseObserver);
LOGGER.info("Started VStream");
}

private VitessGrpc.VitessStub newStub(ManagedChannel channel) {
Expand All @@ -194,6 +195,7 @@ private <T extends AbstractStub<T>> T withCredentials(T stub) {
private ManagedChannel newChannel(String vtgateHost, int vtgatePort) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(vtgateHost, vtgatePort)
.usePlaintext()
.keepAliveTime(config.getKeepaliveInterval().toMillis(), TimeUnit.MILLISECONDS)
.build();
return channel;
}
Expand Down
Expand Up @@ -200,8 +200,9 @@ protected static String incrementGtid(String gtid, int increment) {
return gtid.substring(0, idx) + seq;
}

protected TestConsumer testConsumer(int expectedRecordsCount, String... topicPrefixes) {
return new TestConsumer(expectedRecordsCount, topicPrefixes);
protected TestConsumer testConsumer(int expectedRecordsCount, String... topicPrefixes) throws InterruptedException {
TestConsumer consumer = new TestConsumer(expectedRecordsCount, topicPrefixes);
return consumer;
}

/** Same as io.debezium.connector.postgresql.AbstractRecordsProducerTest.TestConsumer */
Expand Down
Expand Up @@ -559,6 +559,13 @@ private void waitForShardedGtidAcquiring(final LogInterceptor logInterceptor) {
.until(() -> logInterceptor.containsMessage("Default VGTID '[{\"keyspace\":"));
}

private void waitForVStreamStarted(final LogInterceptor logInterceptor) {
// The inserts must happen only after VStream is started some buffer.
Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()))
.pollInterval(Duration.ofSeconds(1))
.until(() -> logInterceptor.containsMessage("Started VStream"));
}

private void startConnector() throws InterruptedException {
startConnector(false);
}
Expand All @@ -576,9 +583,11 @@ private void startConnector(boolean hasMultipleShards) throws InterruptedExcepti
private void startConnector(Function<Configuration.Builder, Configuration.Builder> customConfig, boolean hasMultipleShards)
throws InterruptedException {
Configuration.Builder configBuilder = customConfig.apply(TestHelper.defaultConfig(hasMultipleShards));
final LogInterceptor logInterceptor = new LogInterceptor();
start(VitessConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForStreamingRunning();
waitForVStreamStarted(logInterceptor);
}

private void waitForStreamingRunning() throws InterruptedException {
Expand Down

0 comments on commit 341cd51

Please sign in to comment.