Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… #15910

Merged
merged 15 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.HashMap;
import java.nio.ByteBuffer;
import java.util.Objects;

/** Checkpoint records emitted from MirrorCheckpointConnector. Encodes remote consumer group state. */
public class Checkpoint {
Expand Down Expand Up @@ -180,5 +181,18 @@ byte[] recordKey() {
byte[] recordValue() {
return serializeValue(VERSION).array();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Checkpoint that = (Checkpoint) o;
return upstreamOffset == that.upstreamOffset && downstreamOffset == that.downstreamOffset && Objects.equals(consumerGroupId, that.consumerGroupId) && Objects.equals(topicPartition, that.topicPartition) && Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(consumerGroupId, topicPartition, upstreamOffset, downstreamOffset, metadata);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE;

/**
* Reads once the Kafka log for checkpoints and populates a map of
* checkpoints per consumer group.
*
* The Kafka log is closed after the initial load and only the in memory map is
* used after start.
*/
public class CheckpointStore implements AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(CheckpointStore.class);

private final MirrorCheckpointTaskConfig config;
private final Set<String> consumerGroups;

private TopicAdmin cpAdmin = null;
private KafkaBasedLog<byte[], byte[]> backingStore = null;
// accessible for testing
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup;

private volatile boolean loadSuccess = false;
private volatile boolean isInitialized = false;

public CheckpointStore(MirrorCheckpointTaskConfig config, Set<String> consumerGroups) {
this.config = config;
this.consumerGroups = new HashSet<>(consumerGroups);
}

// constructor for testing only
CheckpointStore(Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup) {
this.config = null; //ignored by tests
this.consumerGroups = null; //ignored by tests
this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
isInitialized = true;
loadSuccess = true;
}

// potentially long running
public boolean start() {
checkpointsPerConsumerGroup = readCheckpoints();
isInitialized = true;
if (log.isTraceEnabled()) {
log.trace("CheckpointStore started, load success={}, map={}", loadSuccess, checkpointsPerConsumerGroup);
} else {
log.debug("CheckpointStore started, load success={}, map.size={}", loadSuccess, checkpointsPerConsumerGroup.size());
}
return loadSuccess;
}

public boolean isInitialized() {
return isInitialized;
}

public void update(String group, Map<TopicPartition, Checkpoint> newCheckpoints) {
Map<TopicPartition, Checkpoint> oldCheckpoints = checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>());
oldCheckpoints.putAll(newCheckpoints);
}

public Map<TopicPartition, Checkpoint> get(String group) {
Map<TopicPartition, Checkpoint> result = checkpointsPerConsumerGroup.get(group);
return result == null ? null : Collections.unmodifiableMap(result);
}

public Map<String, Map<TopicPartition, OffsetAndMetadata>> computeConvertedUpstreamOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>();

for (Map.Entry<String, Map<TopicPartition, Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
String consumerId = entry.getKey();
Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = new HashMap<>();
for (Checkpoint checkpoint : entry.getValue().values()) {
convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
}
result.put(consumerId, convertedUpstreamOffset);
}
return result;
}

@Override
public void close() {
releaseResources();
}

private void releaseResources() {
Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "backing store for previous Checkpoints");
Utils.closeQuietly(cpAdmin, "admin client for previous Checkpoints");
cpAdmin = null;
backingStore = null;
}

// read the checkpoints topic to initialize the checkpointsPerConsumerGroup state
// the callback may only handle errors thrown by consumer.poll in KafkaBasedLog
// e.g. unauthorized to read from topic (non-retriable)
// if any are encountered, treat the loading of Checkpoints as failed.
private Map<String, Map<TopicPartition, Checkpoint>> readCheckpoints() {
Map<String, Map<TopicPartition, Checkpoint>> checkpoints = new HashMap<>();
Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = (error, cpRecord) -> {
if (error != null) {
// if there is no authorization to READ from the topic, we must throw an error
// to stop the KafkaBasedLog forever looping attempting to read to end
checkpoints.clear();
if (error instanceof RuntimeException) {
throw (RuntimeException) error;
} else {
throw new RuntimeException(error);
}
} else {
try {
Checkpoint cp = Checkpoint.deserializeRecord(cpRecord);
if (consumerGroups.contains(cp.consumerGroupId())) {
Map<TopicPartition, Checkpoint> cps = checkpoints.computeIfAbsent(cp.consumerGroupId(), ignored1 -> new HashMap<>());
cps.put(cp.topicPartition(), cp);
}
} catch (SchemaException ex) {
log.warn("Ignored invalid checkpoint record at offset {}", cpRecord.offset(), ex);
}
}
};

try {
long startTime = System.currentTimeMillis();
readCheckpointsImpl(config, consumedCallback);
log.debug("starting+stopping KafkaBasedLog took {}ms", System.currentTimeMillis() - startTime);
loadSuccess = true;
} catch (Exception error) {
loadSuccess = false;
if (error instanceof AuthorizationException) {
log.warn("Not authorized to access checkpoints topic {} - " +
"this may degrade offset translation as only checkpoints " +
"for offsets which were mirrored after the task started will be emitted",
config.checkpointsTopic(), error);
} else {
log.info("Exception encountered loading checkpoints topic {} - " +
"this may degrade offset translation as only checkpoints " +
"for offsets which were mirrored after the task started will be emitted",
config.checkpointsTopic(), error);
}
}
return checkpoints;
}

// accessible for testing
void readCheckpointsImpl(MirrorCheckpointTaskConfig config, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback) {
try {
cpAdmin = new TopicAdmin(
config.targetAdminConfig("checkpoint-target-admin"),
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")));

backingStore = KafkaBasedLog.withExistingClients(
config.checkpointsTopic(),
MirrorUtils.newConsumer(config.targetConsumerConfig(CHECKPOINTS_TARGET_CONSUMER_ROLE)),
null,
cpAdmin,
consumedCallback,
Time.SYSTEM,
ignored -> {
},
topicPartition -> topicPartition.partition() == 0);

backingStore.start(true);
backingStore.stop();
} finally {
releaseResources();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
public static final Class<?> GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class;
public static final String OFFSET_SYNCS_SOURCE_CONSUMER_ROLE = "offset-syncs-source-consumer";
public static final String OFFSET_SYNCS_TARGET_CONSUMER_ROLE = "offset-syncs-target-consumer";
public static final String CHECKPOINTS_TARGET_CONSUMER_ROLE = "checkpoints-target-consumer";
public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = "offset-syncs-source-admin";
public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = "offset-syncs-target-admin";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,25 @@ public class MirrorCheckpointTask extends SourceTask {
private MirrorCheckpointMetrics metrics;
private Scheduler scheduler;
private Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset;
private Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup;
private CheckpointStore checkpointStore;

public MirrorCheckpointTask() {}

// for testing
MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias,
ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore,
ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore, Set<String> consumerGroups,
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset,
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup) {
CheckpointStore checkpointStore) {
this.sourceClusterAlias = sourceClusterAlias;
this.targetClusterAlias = targetClusterAlias;
this.replicationPolicy = replicationPolicy;
this.offsetSyncStore = offsetSyncStore;
this.consumerGroups = consumerGroups;
this.idleConsumerGroupsOffset = idleConsumerGroupsOffset;
this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
this.checkpointStore = checkpointStore;
this.topicFilter = topic -> true;
this.interval = Duration.ofNanos(1);
this.pollTimeout = Duration.ofNanos(1);
}

@Override
Expand All @@ -103,15 +107,18 @@ public void start(Map<String, String> props) {
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
metrics = config.metrics();
idleConsumerGroupsOffset = new HashMap<>();
checkpointsPerConsumerGroup = new HashMap<>();
checkpointStore = new CheckpointStore(config, consumerGroups);
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
scheduler.execute(() -> {
offsetSyncStore.start();
// loading the stores are potentially long running operations, so they run asynchronously
// to avoid blocking task::start (until a task has completed starting it cannot be stopped)
boolean checkpointsReadOk = checkpointStore.start();
offsetSyncStore.start(!checkpointsReadOk);
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
"refreshing idle consumers group offsets at target cluster");
scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
"sync idle consumer group offset from source to target");
}, "starting offset sync store");
}, "starting checkpoint and offset sync stores");
log.info("{} checkpointing {} consumer groups {}->{}: {}.", Thread.currentThread().getName(),
consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups);
}
Expand All @@ -126,6 +133,7 @@ public void stop() {
long start = System.currentTimeMillis();
stopping = true;
Utils.closeQuietly(topicFilter, "topic filter");
Utils.closeQuietly(checkpointStore, "checkpoints store");
Utils.closeQuietly(offsetSyncStore, "offset sync store");
Utils.closeQuietly(sourceAdminClient, "source admin client");
Utils.closeQuietly(targetAdminClient, "target admin client");
Expand All @@ -146,8 +154,8 @@ public List<SourceRecord> poll() throws InterruptedException {
while (!stopping && System.currentTimeMillis() < deadline) {
Thread.sleep(pollTimeout.toMillis());
}
if (stopping) {
// we are stopping, return early.
if (stopping || !checkpointStore.isInitialized()) {
// we are stopping, or not fully initialized, return early.
return null;
}
List<SourceRecord> records = new ArrayList<>();
Expand All @@ -166,14 +174,13 @@ public List<SourceRecord> poll() throws InterruptedException {
}
}


private List<SourceRecord> sourceRecordsForGroup(String group) throws InterruptedException {
// visible for testing
List<SourceRecord> sourceRecordsForGroup(String group) throws InterruptedException {
try {
long timestamp = System.currentTimeMillis();
Map<TopicPartition, OffsetAndMetadata> upstreamGroupOffsets = listConsumerGroupOffsets(group);
Map<TopicPartition, Checkpoint> newCheckpoints = checkpointsForGroup(upstreamGroupOffsets, group);
Map<TopicPartition, Checkpoint> oldCheckpoints = checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>());
oldCheckpoints.putAll(newCheckpoints);
checkpointStore.update(group, newCheckpoints);
return newCheckpoints.values().stream()
.map(x -> checkpointRecord(x, timestamp))
.collect(Collectors.toList());
Expand All @@ -195,7 +202,7 @@ Map<TopicPartition, Checkpoint> checkpointsForGroup(Map<TopicPartition, OffsetAn
}

private boolean checkpointIsMoreRecent(Checkpoint checkpoint) {
Map<TopicPartition, Checkpoint> checkpoints = checkpointsPerConsumerGroup.get(checkpoint.consumerGroupId());
Map<TopicPartition, Checkpoint> checkpoints = checkpointStore.get(checkpoint.consumerGroupId());
if (checkpoints == null) {
log.trace("Emitting {} (first for this group)", checkpoint);
return true;
Expand Down Expand Up @@ -314,7 +321,7 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() throws Exe
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = new HashMap<>();

// first, sync offsets for the idle consumers at target
for (Entry<String, Map<TopicPartition, OffsetAndMetadata>> group : getConvertedUpstreamOffset().entrySet()) {
for (Entry<String, Map<TopicPartition, OffsetAndMetadata>> group : checkpointStore.computeConvertedUpstreamOffset().entrySet()) {
String consumerGroupId = group.getKey();
// for each idle consumer at target, read the checkpoints (converted upstream offset)
// from the pre-populated map
Expand Down Expand Up @@ -391,18 +398,4 @@ void syncGroupOffset(String consumerGroupId, Map<TopicPartition, OffsetAndMetada
);
}
}

Map<String, Map<TopicPartition, OffsetAndMetadata>> getConvertedUpstreamOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>();

for (Entry<String, Map<TopicPartition, Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
String consumerId = entry.getKey();
Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = new HashMap<>();
for (Checkpoint checkpoint : entry.getValue().values()) {
convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
}
result.put(consumerId, convertedUpstreamOffset);
}
return result;
}
}
Loading