Skip to content

Commit

Permalink
KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downs…
Browse files Browse the repository at this point in the history
…tream lag, syncing stale offsets, and flaky integration tests (#13178)

KAFKA-12468: Fix negative lag on down consumer groups synced by MirrorMaker 2

KAFKA-13659: Stop syncing consumer groups with stale offsets in MirrorMaker 2

KAFKA-12566: Fix flaky MirrorMaker 2 integration tests

Reviewers: Chris Egerton <chrise@aiven.io>
  • Loading branch information
gharris1727 committed Feb 17, 2023
1 parent 2e3bbe6 commit a54a34a
Show file tree
Hide file tree
Showing 11 changed files with 507 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ Map<String, Object> offsetSyncsTopicConsumerConfig() {
: targetConsumerConfig();
}

Map<String, Object> offsetSyncsTopicAdminConfig() {
return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
? sourceAdminConfig()
: targetAdminConfig();
}

Duration consumerPollTimeout() {
return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,13 @@ public void start(Map<String, String> props) {
idleConsumerGroupsOffset = new HashMap<>();
checkpointsPerConsumerGroup = new HashMap<>();
scheduler = new Scheduler(MirrorCheckpointTask.class, config.adminTimeout());
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
"refreshing idle consumers group offsets at target cluster");
scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
"sync idle consumer group offset from source to target");
scheduler.execute(() -> {
offsetSyncStore.start();
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
"refreshing idle consumers group offsets at target cluster");
scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
"sync idle consumer group offset from source to target");
}, "starting offset sync store");
}

@Override
Expand Down Expand Up @@ -136,7 +139,11 @@ public List<SourceRecord> poll() throws InterruptedException {
try {
long deadline = System.currentTimeMillis() + interval.toMillis();
while (!stopping && System.currentTimeMillis() < deadline) {
offsetSyncStore.update(pollTimeout);
Thread.sleep(pollTimeout.toMillis());
}
if (stopping) {
// we are stopping, return early.
return null;
}
List<SourceRecord> records = new ArrayList<>();
for (String group : consumerGroups) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,65 +16,124 @@
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;

import java.util.Map;
import java.util.HashMap;
import java.util.Collections;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;

/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
class OffsetSyncStore implements AutoCloseable {
private final KafkaConsumer<byte[], byte[]> consumer;
private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
private final TopicPartition offsetSyncTopicPartition;
private final KafkaBasedLog<byte[], byte[]> backingStore;
private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
private final TopicAdmin admin;
protected volatile boolean readToEnd = false;

OffsetSyncStore(MirrorCheckpointConfig config) {
consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
new ByteArrayDeserializer(), new ByteArrayDeserializer());
offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
consumer.assign(Collections.singleton(offsetSyncTopicPartition));
Consumer<byte[], byte[]> consumer = null;
TopicAdmin admin = null;
KafkaBasedLog<byte[], byte[]> store;
try {
consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());
admin = new TopicAdmin(
config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
store = createBackingStore(config, consumer, admin);
} catch (Throwable t) {
Utils.closeQuietly(consumer, "consumer for offset syncs");
Utils.closeQuietly(admin, "admin client for offset syncs");
throw t;
}
this.admin = admin;
this.backingStore = store;
}

// for testing
OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition offsetSyncTopicPartition) {
this.consumer = consumer;
this.offsetSyncTopicPartition = offsetSyncTopicPartition;
private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorCheckpointConfig config, Consumer<byte[], byte[]> consumer, TopicAdmin admin) {
return new KafkaBasedLog<byte[], byte[]>(
config.offsetSyncsTopic(),
Collections.emptyMap(),
Collections.emptyMap(),
() -> admin,
(error, record) -> this.handleRecord(record),
Time.SYSTEM,
ignored -> {
}
) {
@Override
protected Producer<byte[], byte[]> createProducer() {
return null;
}

@Override
protected Consumer<byte[], byte[]> createConsumer() {
return consumer;
}

@Override
protected boolean readPartition(TopicPartition topicPartition) {
return topicPartition.partition() == 0;
}
};
}

OffsetSyncStore() {
this.admin = null;
this.backingStore = null;
}

/**
* Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage.
*/
public void start() {
backingStore.start();
readToEnd = true;
}

OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
if (!readToEnd) {
// If we have not read to the end of the syncs topic at least once, decline to translate any offsets.
// This prevents emitting stale offsets while initially reading the offset syncs topic.
return OptionalLong.empty();
}
Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
if (offsetSync.isPresent()) {
if (offsetSync.get().upstreamOffset() > upstreamOffset) {
// Offset is too far in the past to translate accurately
return OptionalLong.of(-1L);
}
long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
// If the consumer group is ahead of the offset sync, we can translate the upstream offset only 1
// downstream offset past the offset sync itself. This is because we know that future records must appear
// ahead of the offset sync, but we cannot estimate how many offsets from the upstream topic
// will be written vs dropped. If we overestimate, then we may skip the correct offset and have data loss.
// This also handles consumer groups at the end of a topic whose offsets point past the last valid record.
// This may cause re-reading of records depending on the age of the offset sync.
// s=offset sync pair, ?=record may or may not be replicated, g=consumer group offset, r=re-read record
// source |-s?????r???g-|
// | ______/
// | /
// vv
// target |-sg----r-----|
long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1;
return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
} else {
return OptionalLong.empty();
}
}

// poll and handle records
synchronized void update(Duration pollTimeout) {
try {
consumer.poll(pollTimeout).forEach(this::handleRecord);
} catch (WakeupException e) {
// swallow
}
}

public synchronized void close() {
consumer.wakeup();
Utils.closeQuietly(consumer, "offset sync store consumer");
@Override
public void close() {
Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "backing store for offset syncs");
Utils.closeQuietly(admin, "admin client for offset syncs");
}

protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,16 @@ public void testDownstreamTopicRenaming() {

@Test
public void testCheckpoint() {
long t1UpstreamOffset = 3L;
long t1DownstreamOffset = 4L;
long t2UpstreamOffset = 7L;
long t2DownstreamOffset = 8L;
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
offsetSyncStore.sync(new TopicPartition("topic1", 2), t1UpstreamOffset, t1DownstreamOffset);
offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), t2UpstreamOffset, t2DownstreamOffset);
Optional<Checkpoint> optionalCheckpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
new OffsetAndMetadata(10, null));
assertTrue(optionalCheckpoint1.isPresent());
Expand All @@ -70,7 +75,7 @@ public void testCheckpoint() {
"checkpoint group9 sourcePartition failed");
assertEquals(10, checkpoint1.upstreamOffset(),
"checkpoint group9 upstreamOffset failed");
assertEquals(11, checkpoint1.downstreamOffset(),
assertEquals(t1DownstreamOffset + 1, checkpoint1.downstreamOffset(),
"checkpoint group9 downstreamOffset failed");
assertEquals(123L, sourceRecord1.timestamp().longValue(),
"checkpoint group9 timestamp failed");
Expand All @@ -87,10 +92,27 @@ public void testCheckpoint() {
"checkpoint group11 sourcePartition failed");
assertEquals(12, checkpoint2.upstreamOffset(),
"checkpoint group11 upstreamOffset failed");
assertEquals(13, checkpoint2.downstreamOffset(),
assertEquals(t2DownstreamOffset + 1, checkpoint2.downstreamOffset(),
"checkpoint group11 downstreamOffset failed");
assertEquals(234L, sourceRecord2.timestamp().longValue(),
"checkpoint group11 timestamp failed");
Optional<Checkpoint> optionalCheckpoint3 = mirrorCheckpointTask.checkpoint("group13", new TopicPartition("target2.topic5", 6),
new OffsetAndMetadata(7, null));
assertTrue(optionalCheckpoint3.isPresent());
Checkpoint checkpoint3 = optionalCheckpoint3.get();
SourceRecord sourceRecord3 = mirrorCheckpointTask.checkpointRecord(checkpoint3, 234L);
assertEquals(new TopicPartition("topic5", 6), checkpoint3.topicPartition(),
"checkpoint group13 topic5 failed");
assertEquals("group13", checkpoint3.consumerGroupId(),
"checkpoint group13 consumerGroupId failed");
assertEquals("group13", Checkpoint.unwrapGroup(sourceRecord3.sourcePartition()),
"checkpoint group13 sourcePartition failed");
assertEquals(t2UpstreamOffset, checkpoint3.upstreamOffset(),
"checkpoint group13 upstreamOffset failed");
assertEquals(t2DownstreamOffset, checkpoint3.downstreamOffset(),
"checkpoint group13 downstreamOffset failed");
assertEquals(234L, sourceRecord3.timestamp().longValue(),
"checkpoint group13 timestamp failed");
}

@Test
Expand Down Expand Up @@ -150,6 +172,7 @@ public void testSyncOffset() {
@Test
public void testNoCheckpointForTopicWithoutOffsetSyncs() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 0), 3L, 4L);
Expand All @@ -165,6 +188,7 @@ public void testNoCheckpointForTopicWithoutOffsetSyncs() {
@Test
public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.junit.jupiter.api.Test;

import java.util.OptionalLong;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class OffsetSyncStoreTest {
Expand All @@ -30,7 +32,13 @@ public class OffsetSyncStoreTest {
static class FakeOffsetSyncStore extends OffsetSyncStore {

FakeOffsetSyncStore() {
super(null, null);
super();
}

@Override
public void start() {
// do not call super to avoid NPE without a KafkaBasedLog.
readToEnd = true;
}

void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
Expand All @@ -44,29 +52,57 @@ void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOff

@Test
public void testOffsetTranslation() {
FakeOffsetSyncStore store = new FakeOffsetSyncStore();

store.sync(tp, 100, 200);
assertEquals(250L, store.translateDownstream(tp, 150).getAsLong(),
"Failure in translating downstream offset 250");

// Translate exact offsets
store.sync(tp, 150, 251);
assertEquals(251L, store.translateDownstream(tp, 150).getAsLong(),
"Failure in translating exact downstream offset 251");

// Use old offset (5) prior to any sync -> can't translate
assertEquals(-1, store.translateDownstream(tp, 5).getAsLong(),
"Expected old offset to not translate");

// Downstream offsets reset
store.sync(tp, 200, 10);
assertEquals(10L, store.translateDownstream(tp, 200).getAsLong(),
"Failure in resetting translation of downstream offset");

// Upstream offsets reset
store.sync(tp, 20, 20);
assertEquals(20L, store.translateDownstream(tp, 20).getAsLong(),
"Failure in resetting translation of upstream offset");
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
store.start();

// Emit synced downstream offset without dead-reckoning
store.sync(tp, 100, 200);
assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));

// Translate exact offsets
store.sync(tp, 150, 251);
assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));

// Use old offset (5) prior to any sync -> can't translate
assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));

// Downstream offsets reset
store.sync(tp, 200, 10);
assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));

// Upstream offsets reset
store.sync(tp, 20, 20);
assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
}
}

@Test
public void testNoTranslationIfStoreNotStarted() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
// no offsets exist and store is not started
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));

// read a sync during startup
store.sync(tp, 100, 200);
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));

// After the store is started all offsets are visible
store.start();
assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
}
}

@Test
public void testNoTranslationIfNoOffsetSync() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
store.start();
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
}
}
}

0 comments on commit a54a34a

Please sign in to comment.