Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-6014 Fixed correctness issues and optimized logging #13

Merged
merged 1 commit into from Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -9,6 +9,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -28,21 +29,38 @@ public FinishPartitionWatchDog(FinishingPartitionManager finishingPartitionManag

this.thread = new Thread(() -> {

Instant lastUpdatedTime = Instant.now();
while (true) {

Set<String> pendingToFinish = finishingPartitionManager.getPendingFinishPartitions();
Set<String> pending = finishingPartitionManager.getPendingPartitions();

pendingToFinish.forEach(token -> partition.computeIfAbsent(token, token1 -> Instant.now()));
pendingToFinish.forEach(
token -> partition.computeIfAbsent(token, token1 -> Instant.now()));

partition.keySet().stream().filter(token -> !pendingToFinish.contains(token)).forEach(partition::remove);
if (Instant.now().isAfter(lastUpdatedTime.plus(Duration.ofSeconds(600)))) {
LOGGER.info("Get pending partitions: {}", pendingToFinish);
LOGGER.info("Get pending total partitions: {}", pending);
lastUpdatedTime = Instant.now();
}

Iterator<Map.Entry<String, Instant>> itr = partition.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, Instant> entry = itr.next();
if (!pendingToFinish.contains(entry.getKey())) {
itr.remove();
}
}

List<String> tokens = new ArrayList<>();

partition.forEach((token, instant) -> {
if (instant.isAfter(instant.plus(timeout))) {
tokens.add(token);
}
});
Instant currentTime = Instant.now();
partition.forEach(
(token, instant) -> {
if (currentTime.isAfter(instant.plus(timeout))) {
tokens.add(token);
}
});

if (!tokens.isEmpty()) {
LOGGER.warn("Partitions awaiting finish : {}, timeout: {}", tokens, timeout);
Expand All @@ -58,9 +76,11 @@ public FinishPartitionWatchDog(FinishingPartitionManager finishingPartitionManag
}

}, "SpannerConnector-FinishingPartitionWatchDog");
this.thread.start();
}

public void stop() {
LOGGER.info("Interrupting SpannerConnector-FinishingPartitionWatchDog");
this.thread.interrupt();
}
}
Expand Up @@ -34,8 +34,10 @@ public FinishingPartitionManager(BlockingConsumer<String> finishedPartitionConsu
this.finishedPartitionConsumer = finishedPartitionConsumer;
}

public void newRecord(String token, String recordUid) {
public String newRecord(String token) {
String recordUid = lastEmittedRecord.get(token) == null ? "aaaaaaaa" : next(lastEmittedRecord.get(token));
lastEmittedRecord.put(token, recordUid);
return recordUid;
}

public void registerPartition(String token) {
Expand All @@ -51,7 +53,14 @@ public void commitRecord(String token, String recordUid) throws InterruptedExcep
}

if (!pendingFinishFlag) {
lastCommittedRecord.put(token, recordUid);
if (lastCommittedRecord.get(token) == null) {
lastCommittedRecord.put(token, recordUid);
}
else {
if (recordUid.compareTo(lastCommittedRecord.get(token)) > 0) {
lastCommittedRecord.put(token, recordUid);
}
}
return;
}

Expand All @@ -77,6 +86,12 @@ public void onPartitionFinishEvent(String token) throws InterruptedException {
LOGGER.info("Finished forcing the token to be finished {}", token);
}
else {
LOGGER.info(
"Cannot finish the token {} due to lastCommittedRecord {} not being equal to"
+ " lastEmittedRecord {}",
token,
lastCommittedRecord.get(token),
lastEmittedRecord.get(token));
partitionPendingFinish.put(token, true);
}
}
Expand All @@ -91,9 +106,41 @@ public void forceFinish(String token) throws InterruptedException {

public Set<String> getPendingFinishPartitions() {
return partitionPendingFinish.entrySet().stream()
.filter(entry -> Boolean.TRUE.equals(entry.getValue()))
.filter(entry -> entry.getValue().equals(true))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}

public Set<String> getPendingPartitions() {
return partitionPendingFinish.entrySet().stream()
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}

private String next(String str) {
// If string is empty.
if (str.isEmpty()) {
return "a";
}

// Find first character from right
// which is not z.

int i = str.length() - 1;
while (i >= 0 && str.charAt(i) == 'z') {
i--;
}
if (i == -1) {
str = str + 'a';
}
else {
String suffix = "";
for (int j = i + 1; j < str.length(); j++) {
suffix += 'a';
}
str = str.substring(0, i) + (char) ((int) (str.charAt(i)) + 1) + suffix;
}
return str;
}

}
Expand Up @@ -286,4 +286,8 @@ public boolean failOverloadedTask() {
public long failOverloadedTaskInterval() {
return getConfig().getLong(TASKS_FAIL_OVERLOADED_CHECK_INTERVAL);
}

public int getTopicNumPartitions() {
return getConfig().getInteger(TOPIC_DEFAULT_AUTO_CREATION_PARTITIONS_FIELD);
}
}
Expand Up @@ -7,7 +7,6 @@

import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -222,11 +221,9 @@ void processDataChangeEvent(DataChangeEvent event) throws InterruptedException {
SpannerPartition partition = new SpannerPartition(event.getPartitionToken());

for (Mod mod : event.getMods()) {
String recordUid = UUID.randomUUID().toString();

SpannerOffsetContext offsetContext = offsetContextFactory.getOffsetContextFromDataChangeEvent(mod.getModNumber(), event);

finishingPartitionManager.newRecord(partition.getValue(), recordUid);
String recordUid = this.finishingPartitionManager.newRecord(event.getPartitionToken());

boolean dispatched = spannerEventDispatcher.dispatchDataChangeEvent(partition, tableId,
new SpannerChangeRecordEmitter(recordUid, event.getModType(), mod, partition, offsetContext,
Expand Down
Expand Up @@ -32,6 +32,8 @@ public abstract class BaseSpannerConnectorConfig extends CommonConnectorConfig {

private static final String LOW_WATERMARK_STAMP_INTERVAL_NAME = "gcp.spanner.low-watermark.stamp.interval";

private static final String TOPIC_DEFAULT_AUTO_CREATION_PARTITIONS_PROPERTY_NAME = "topic.creation.default.partitions";

protected static final String GCP_SPANNER_PROJECT_ID_PROPERTY_NAME = "gcp.spanner.project.id";
protected static final String GCP_SPANNER_INSTANCE_ID_PROPERTY_NAME = "gcp.spanner.instance.id";
protected static final String GCP_SPANNER_DATABASE_ID_PROPERTY_NAME = "gcp.spanner.database.id";
Expand Down Expand Up @@ -226,6 +228,15 @@ public abstract class BaseSpannerConnectorConfig extends CommonConnectorConfig {
.withDefault(10000)
.withValidation(Field::isNonNegativeInteger);

protected static final Field TOPIC_DEFAULT_AUTO_CREATION_PARTITIONS_FIELD = Field.create(TOPIC_DEFAULT_AUTO_CREATION_PARTITIONS_PROPERTY_NAME)
.withDisplayName("Topic auto creation num partitions")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 10))
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDefault(1)
.withDescription("Number of partitions in automatically created topic");

protected static final Field MAX_MISSED_HEARTBEATS = Field.create(MAX_MISSED_HEARTBEATS_PROPERTY_NAME)
.withDisplayName("Maximum missed heartbeats to identify that partition gets stuck")
.withType(Type.INT)
Expand Down Expand Up @@ -520,6 +531,7 @@ public abstract class BaseSpannerConnectorConfig extends CommonConnectorConfig {
TASK_STATE_CHANGE_EVENT_QUEUE_CAPACITY,
VALUE_CAPTURE_MODE,
SPANNER_HEART_BEAT_INTERVAL,
TOPIC_DEFAULT_AUTO_CREATION_PARTITIONS_FIELD,

MAX_BATCH_SIZE,
MAX_QUEUE_SIZE,
Expand Down
Expand Up @@ -12,6 +12,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
Expand All @@ -36,11 +37,11 @@ public KafkaPartitionInfoProvider(AdminClient adminClient) {
this.adminClient = adminClient;
}

public Collection<Integer> getPartitions(String topicName) throws ExecutionException, InterruptedException {
public Collection<Integer> getPartitions(String topicName, Optional<Integer> numPartitions) throws ExecutionException, InterruptedException {

try {
if (!topicExists(adminClient, topicName)) {
createTopic(adminClient, topicName, 1, Map.of());
createTopic(adminClient, topicName, numPartitions, Map.of());
}

DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topicName));
Expand Down
Expand Up @@ -24,9 +24,9 @@ public static boolean topicExists(AdminClient adminClient, String topic) throws
return topics.contains(topic);
}

public static void createTopic(AdminClient adminClient, String topic, int numPartitions, Map<String, String> configs)
public static void createTopic(AdminClient adminClient, String topic, Optional<Integer> numPartitions, Map<String, String> configs)
throws ExecutionException, InterruptedException {
NewTopic newTopic = new NewTopic(topic, Optional.of(numPartitions), Optional.empty()).configs(configs);
NewTopic newTopic = new NewTopic(topic, numPartitions, Optional.empty()).configs(configs);
CreateTopicsResult result = adminClient.createTopics(List.of(newTopic));
result.topicId(topic).get();
}
Expand Down
Expand Up @@ -5,30 +5,47 @@
*/
package io.debezium.connector.spanner.kafka.internal;

import static org.slf4j.LoggerFactory.getLogger;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.slf4j.Logger;

/**
* This class allows to publish the latest buffered value
* once per time period, except the case: if the value is required
* to be published immediately.
*/
public class BufferedPublisher<V> {

private static final Logger LOGGER = getLogger(BufferedPublisher.class);

private final Thread thread;
private final AtomicReference<V> value = new AtomicReference<>();
private final Predicate<V> publishImmediately;
private final Consumer<V> onPublish;
private final String taskUid;

public BufferedPublisher(String name, long timeout, Predicate<V> publishImmediately, Consumer<V> onPublish) {
public BufferedPublisher(String taskUid, String name, long timeout, Predicate<V> publishImmediately, Consumer<V> onPublish) {
this.publishImmediately = publishImmediately;
this.onPublish = onPublish;
this.taskUid = taskUid;

this.thread = new Thread(() -> {

Instant lastUpdatedTime = Instant.now();
while (!Thread.currentThread().isInterrupted()) {
try {
if (Instant.now().isAfter(lastUpdatedTime.plus(Duration.ofSeconds(600)))) {
LOGGER.info(
"Task Uid {} is still publishing with AtomicReference value {}",
this.taskUid,
(this.value.get() == null));
lastUpdatedTime = Instant.now();
}
publishBuffered();
Thread.sleep(timeout);
}
Expand Down
Expand Up @@ -8,6 +8,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.AdminClient;
Expand Down Expand Up @@ -36,7 +37,7 @@ public void createAdjustRebalanceTopic() {
String rebalancingTopic = config.rebalancingTopic();
int maxTasks = config.getMaxTasks();
if (!topicExists(rebalancingTopic)) {
createTopic(rebalancingTopic, maxTasks, Map.of());
createTopic(rebalancingTopic, Optional.of(maxTasks), Map.of());
return;
}

Expand All @@ -62,7 +63,7 @@ public void createVerifySyncTopic() {
topicProps.put(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(config.syncSegmentMs()));
topicProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, config.syncMinCleanableDirtyRatio());

createTopic(syncTopic, 1, topicProps);
createTopic(syncTopic, Optional.of(1), topicProps);
return;
}

Expand All @@ -82,7 +83,7 @@ private boolean topicExists(String topic) throws ExecutionException, Interrupted
return KafkaUtils.topicExists(adminClient, topic);
}

private void createTopic(String topic, int numPartitions, Map<String, String> configs) throws ExecutionException, InterruptedException {
private void createTopic(String topic, Optional<Integer> numPartitions, Map<String, String> configs) throws ExecutionException, InterruptedException {
KafkaUtils.createTopic(adminClient, topic, numPartitions, configs);
}

Expand Down
Expand Up @@ -8,6 +8,7 @@
import static org.slf4j.LoggerFactory.getLogger;

import java.time.Duration;
import java.time.Instant;
import java.util.Collection;

import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -104,6 +105,7 @@ public void onPartitionsLost(Collection<TopicPartition> partitions) {
thread = new Thread(() -> {
try {
long commitOffsetStart = System.currentTimeMillis();
Instant lastUpdatedTime = Instant.now();
while (!Thread.currentThread().isInterrupted()) {
try {
consumer.poll(pollDuration);
Expand All @@ -112,6 +114,12 @@ public void onPartitionsLost(Collection<TopicPartition> partitions) {
consumer.commitSync(commitOffsetsTimeout);
commitOffsetStart = System.currentTimeMillis();
}
if (Instant.now().isAfter(lastUpdatedTime.plus(Duration.ofSeconds(600)))) {
LOGGER.info(
"Task Uid {} is still listening to RebalanceEventListener",
this.task.getTaskUid());
lastUpdatedTime = Instant.now();
}
}
catch (org.apache.kafka.common.errors.InterruptException e) {
return;
Expand Down