Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… #15910

Merged
merged 15 commits into from
May 23, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.HashMap;
import java.nio.ByteBuffer;
import java.util.Objects;

/** Checkpoint records emitted from MirrorCheckpointConnector. Encodes remote consumer group state. */
public class Checkpoint {
Expand Down Expand Up @@ -180,5 +181,18 @@ byte[] recordKey() {
byte[] recordValue() {
return serializeValue(VERSION).array();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Checkpoint that = (Checkpoint) o;
return upstreamOffset == that.upstreamOffset && downstreamOffset == that.downstreamOffset && Objects.equals(consumerGroupId, that.consumerGroupId) && Objects.equals(topicPartition, that.topicPartition) && Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(consumerGroupId, topicPartition, upstreamOffset, downstreamOffset, metadata);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
public static final Class<?> GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class;
public static final String OFFSET_SYNCS_SOURCE_CONSUMER_ROLE = "offset-syncs-source-consumer";
public static final String OFFSET_SYNCS_TARGET_CONSUMER_ROLE = "offset-syncs-target-consumer";
public static final String CHECKPOINTS_TARGET_CONSUMER_ROLE = "checkpoints-target-consumer";
public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = "offset-syncs-source-admin";
public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = "offset-syncs-target-admin";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -29,6 +32,9 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;

import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,12 +47,14 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.concurrent.ExecutionException;
import java.time.Duration;
import java.util.stream.Stream;

import static org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE;
import static org.apache.kafka.connect.mirror.MirrorUtils.adminCall;

/** Emits checkpoints for upstream consumer groups. */
Expand All @@ -70,17 +78,19 @@ public class MirrorCheckpointTask extends SourceTask {
private Scheduler scheduler;
private Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset;
private Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup;

public MirrorCheckpointTask() {}

// for testing
MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias,
ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore,
ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore, Set<String> consumerGroups,
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset,
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup) {
this.sourceClusterAlias = sourceClusterAlias;
this.targetClusterAlias = targetClusterAlias;
this.replicationPolicy = replicationPolicy;
this.offsetSyncStore = offsetSyncStore;
this.consumerGroups = consumerGroups;
this.idleConsumerGroupsOffset = idleConsumerGroupsOffset;
this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
this.topicFilter = topic -> true;
Expand All @@ -103,10 +113,11 @@ public void start(Map<String, String> props) {
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
metrics = config.metrics();
idleConsumerGroupsOffset = new HashMap<>();
checkpointsPerConsumerGroup = new HashMap<>();
Optional<Map<String, Map<TopicPartition, Checkpoint>>> checkpoints = readCheckpoints(config);
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
checkpointsPerConsumerGroup = checkpoints.orElse(new HashMap<>());
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
scheduler.execute(() -> {
offsetSyncStore.start();
offsetSyncStore.start(!checkpoints.isPresent());
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
"refreshing idle consumers group offsets at target cluster");
scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
Expand All @@ -116,6 +127,73 @@ public void start(Map<String, String> props) {
consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups);
}

// read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task
// the callback may only handle errors thrown by consumer.poll in KafkaBasedLog
// e.g. unauthorized to read from topic (non-retriable)
// if any are encountered, treat the loading of Checkpoints as failed.
Optional<Map<String, Map<TopicPartition, Checkpoint>>> readCheckpoints(MirrorCheckpointTaskConfig config) {
AtomicBoolean successful = new AtomicBoolean(true);
Map<String, Map<TopicPartition, Checkpoint>> checkpoints = new HashMap<>();
Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() {
@Override
public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> cpRecord) {
if (error != null && successful.getAndSet(false)) {
log.error("Error loading Checkpoint topic", error);
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
checkpoints.clear();
} else if (successful.get()) {
try {
Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);
if (consumerGroups.contains(cp.consumerGroupId())) {
Map<TopicPartition, Checkpoint> cps = checkpoints.computeIfAbsent(cp.consumerGroupId(), ignored1 -> new HashMap<>());
cps.put(cp.topicPartition(), cp);
}
} catch (SchemaException ex) {
log.warn("Ignored invalid checkpoint record at offset {}", cpRecord.offset(), ex);
}
}
}
};

log.info("Starting loading Checkpoint topic : {}", config.checkpointsTopic());
readCheckpointsImpl(config, consumedCallback);
if (successful.get()) {
log.info("Succesfully initialized checkpoints from topic : {}", config.checkpointsTopic());
log.debug("Initial checkpointsPerConsumerGroup : {}", checkpoints);
return Optional.of(checkpoints);
} else {
log.warn("Failed initializing checkpoints from topic : {}", config.checkpointsTopic());
return Optional.empty();
}
}

// accessible for testing
void readCheckpointsImpl(MirrorCheckpointTaskConfig config, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback) {
TopicAdmin cpAdmin = null;
KafkaBasedLog<byte[], byte[]> previousCheckpoints = null;
try {
cpAdmin = new TopicAdmin(
config.targetAdminConfig("checkpoint-target-admin"),
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")));

previousCheckpoints = KafkaBasedLog.withExistingClients(
config.checkpointsTopic(),
MirrorUtils.newConsumer(config.targetConsumerConfig(CHECKPOINTS_TARGET_CONSUMER_ROLE)),
null,
cpAdmin,
consumedCallback,
Time.SYSTEM,
ignored -> {
},
topicPartition -> topicPartition.partition() == 0);

previousCheckpoints.start(true);
previousCheckpoints.stop();
} finally {
Utils.closeQuietly(cpAdmin, "admin client for previous Checkpoints");
Utils.closeQuietly(previousCheckpoints != null ? previousCheckpoints::stop : null, "backing store for previous Checkpoints");
}
}

@Override
public void commit() {
// nop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class OffsetSyncStore implements AutoCloseable {
private final KafkaBasedLog<byte[], byte[]> backingStore;
private final Map<TopicPartition, OffsetSync[]> offsetSyncs = new ConcurrentHashMap<>();
private final TopicAdmin admin;
protected volatile boolean initializationMustReadToEnd = true;
protected volatile boolean readToEnd = false;

OffsetSyncStore(MirrorCheckpointConfig config) {
Expand Down Expand Up @@ -105,7 +106,13 @@ private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorCheckpointConfig
/**
* Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage.
*/
public void start() {
public void start(boolean initializationMustReadToEnd) {
this.initializationMustReadToEnd = initializationMustReadToEnd;
if (initializationMustReadToEnd) {
log.warn("OffsetSyncStore initializationMustReadToEnd = {}", initializationMustReadToEnd);
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
} else {
log.debug("OffsetSyncStore initializationMustReadToEnd = {}", initializationMustReadToEnd);
}
backingStore.start();
readToEnd = true;
}
Expand Down Expand Up @@ -211,10 +218,13 @@ private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {

private void updateSyncArray(OffsetSync[] syncs, OffsetSync[] original, OffsetSync offsetSync) {
long upstreamOffset = offsetSync.upstreamOffset();

// While reading to the end of the topic, ensure that our earliest sync is later than
// any earlier sync that could have been used for translation, to preserve monotonicity
// If the upstream offset rewinds, all previous offsets are invalid, so overwrite them all.
if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
boolean onlyLoadLastOffset = !readToEnd && initializationMustReadToEnd;
boolean upstreamRewind = upstreamOffset < syncs[0].upstreamOffset();
if (onlyLoadLastOffset || upstreamRewind) {
clearSyncArray(syncs, offsetSync);
return;
}
Expand Down