Skip to content

Commit

Permalink
DBZ-4389: Add keepalive pings and handle retriable exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
shichao-an committed Dec 2, 2021
1 parent 256f553 commit cd62b46
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 5 deletions.
Expand Up @@ -5,6 +5,9 @@
*/
package io.debezium.connector.vitess;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
Expand Down Expand Up @@ -106,6 +109,15 @@ public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
.withDescription("Control StopOnReshard VStream flag."
+ " If set true, the old VStream will be stopped after a reshard operation.");

public static final Field KEEPALIVE_INTERVAL_MS = Field.create(VITESS_CONFIG_GROUP_PREFIX + "keepalive.interval.ms")
.withDisplayName("VStream gRPC keepalive interval (ms)")
.withType(Type.LONG)
.withDefault(Long.MAX_VALUE)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Control the interval between periodic gPRC keepalive pings for VStream." +
" Defaults to Long.MAX_VALUE (disabled).");

public static final Field INCLUDE_UNKNOWN_DATATYPES = Field.create("include.unknown.datatypes")
.withDisplayName("Include unknown datatypes")
.withType(Type.BOOLEAN)
Expand All @@ -129,7 +141,8 @@ public class VitessConnectorConfig extends RelationalDatabaseConnectorConfig {
VTGATE_USER,
VTGATE_PASSWORD,
TABLET_TYPE,
STOP_ON_RESHARD_FLAG)
STOP_ON_RESHARD_FLAG,
KEEPALIVE_INTERVAL_MS)
.events(INCLUDE_UNKNOWN_DATATYPES)
.excluding(SCHEMA_EXCLUDE_LIST, SCHEMA_INCLUDE_LIST)
.create();
Expand Down Expand Up @@ -199,6 +212,10 @@ public boolean getStopOnReshard() {
return getConfig().getBoolean(STOP_ON_RESHARD_FLAG);
}

public Duration getKeepaliveInterval() {
return getConfig().getDuration(KEEPALIVE_INTERVAL_MS, ChronoUnit.MILLIS);
}

public boolean includeUnknownDatatypes() {
return getConfig().getBoolean(INCLUDE_UNKNOWN_DATATYPES);
}
Expand Down
Expand Up @@ -80,7 +80,7 @@ protected ChangeEventSourceCoordinator<VitessPartition, VitessOffsetContext> sta
.build();

// saves the exception in the ChangeEventQueue, later task poll() would throw the exception
errorHandler = new ErrorHandler(VitessConnector.class, connectorConfig.getLogicalName(), queue);
errorHandler = new VitessErrorHandler(connectorConfig.getLogicalName(), queue);

// for metrics
final VitessEventMetadataProvider metadataProvider = new VitessEventMetadataProvider();
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/io/debezium/connector/vitess/VitessErrorHandler.java
@@ -0,0 +1,28 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.vitess;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.ErrorHandler;
import io.grpc.StatusRuntimeException;

public class VitessErrorHandler extends ErrorHandler {
public VitessErrorHandler(String logicalName, ChangeEventQueue<?> queue) {
super(VitessConnector.class, logicalName, queue);
}

@Override
protected boolean isRetriable(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
final StatusRuntimeException exception = (StatusRuntimeException) throwable;
switch (exception.getStatus().getCode()) {
case UNAVAILABLE:
return true;
}
}
return false;
}
}
Expand Up @@ -71,7 +71,7 @@ public void execute(ChangeEventSourceContext context, VitessPartition partition,
}
if (error.get() != null) {
LOGGER.error("Error during streaming", error.get());
throw new RuntimeException(error.get());
throw error.get();
}
}
catch (Throwable e) {
Expand Down
Expand Up @@ -194,6 +194,7 @@ private <T extends AbstractStub<T>> T withCredentials(T stub) {
private ManagedChannel newChannel(String vtgateHost, int vtgatePort) {
ManagedChannel channel = ManagedChannelBuilder.forAddress(vtgateHost, vtgatePort)
.usePlaintext()
.keepAliveTime(config.getKeepaliveInterval().toMillis(), TimeUnit.MILLISECONDS)
.build();
return channel;
}
Expand Down
Expand Up @@ -200,8 +200,11 @@ protected static String incrementGtid(String gtid, int increment) {
return gtid.substring(0, idx) + seq;
}

protected TestConsumer testConsumer(int expectedRecordsCount, String... topicPrefixes) {
return new TestConsumer(expectedRecordsCount, topicPrefixes);
protected TestConsumer testConsumer(int expectedRecordsCount, String... topicPrefixes) throws InterruptedException {
TestConsumer consumer = new TestConsumer(expectedRecordsCount, topicPrefixes);
// Sleep for 1 second before return to avoid race conditions.
Thread.sleep(1000);
return consumer;
}

/** Same as io.debezium.connector.postgresql.AbstractRecordsProducerTest.TestConsumer */
Expand Down

0 comments on commit cd62b46

Please sign in to comment.