Skip to content

Commit

Permalink
KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downs…
Browse files Browse the repository at this point in the history
…tream lag, syncing stale offsets, and flaky integration tests (#13178)

KAFKA-12468: Fix negative lag on down consumer groups synced by MirrorMaker 2

KAFKA-13659: Stop syncing consumer groups with stale offsets in MirrorMaker 2

KAFKA-12566: Fix flaky MirrorMaker 2 integration tests

Reviewers: Chris Egerton <chrise@aiven.io>
  • Loading branch information
gharris1727 authored and C0urante committed Feb 23, 2023
1 parent 51ee89a commit 69a2817
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,13 @@ public void start(Map<String, String> props) {
idleConsumerGroupsOffset = new HashMap<>();
checkpointsPerConsumerGroup = new HashMap<>();
scheduler = new Scheduler(MirrorCheckpointTask.class, config.adminTimeout());
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
"refreshing idle consumers group offsets at target cluster");
scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
"sync idle consumer group offset from source to target");
scheduler.execute(() -> {
offsetSyncStore.start();
scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
"refreshing idle consumers group offsets at target cluster");
scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
"sync idle consumer group offset from source to target");
}, "starting offset sync store");
}

@Override
Expand Down Expand Up @@ -135,7 +138,11 @@ public List<SourceRecord> poll() throws InterruptedException {
try {
long deadline = System.currentTimeMillis() + interval.toMillis();
while (!stopping && System.currentTimeMillis() < deadline) {
offsetSyncStore.update(pollTimeout);
Thread.sleep(pollTimeout.toMillis());
}
if (stopping) {
// we are stopping, return early.
return null;
}
List<SourceRecord> records = new ArrayList<>();
for (String group : consumerGroups) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,65 +16,121 @@
*/
package org.apache.kafka.connect.mirror;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;

import java.util.Map;
import java.util.HashMap;
import java.util.Collections;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;

/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
class OffsetSyncStore implements AutoCloseable {
private final KafkaConsumer<byte[], byte[]> consumer;
private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
private final TopicPartition offsetSyncTopicPartition;
private final KafkaBasedLog<byte[], byte[]> backingStore;
private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
private final TopicAdmin admin;
protected volatile boolean readToEnd = false;

OffsetSyncStore(MirrorConnectorConfig config) {
consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
new ByteArrayDeserializer(), new ByteArrayDeserializer());
offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
consumer.assign(Collections.singleton(offsetSyncTopicPartition));
Consumer<byte[], byte[]> consumer = null;
TopicAdmin admin = null;
KafkaBasedLog<byte[], byte[]> store;
try {
consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());
admin = new TopicAdmin(config.offsetSyncsTopicAdminConfig());
store = createBackingStore(config, consumer, admin);
} catch (Throwable t) {
Utils.closeQuietly(consumer, "consumer for offset syncs");
Utils.closeQuietly(admin, "admin client for offset syncs");
throw t;
}
this.admin = admin;
this.backingStore = store;
}

// for testing
OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition offsetSyncTopicPartition) {
this.consumer = consumer;
this.offsetSyncTopicPartition = offsetSyncTopicPartition;
private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorConnectorConfig config, Consumer<byte[], byte[]> consumer, TopicAdmin admin) {
return new KafkaBasedLog<byte[], byte[]>(
config.offsetSyncsTopic(),
Collections.emptyMap(),
Collections.emptyMap(),
() -> admin,
(error, record) -> this.handleRecord(record),
Time.SYSTEM,
ignored -> {
}
) {
@Override
protected Producer<byte[], byte[]> createProducer() {
return null;
}

@Override
protected Consumer<byte[], byte[]> createConsumer() {
return consumer;
}

@Override
protected boolean readPartition(TopicPartition topicPartition) {
return topicPartition.partition() == 0;
}
};
}

OffsetSyncStore() {
this.admin = null;
this.backingStore = null;
}

/**
* Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage.
*/
public void start() {
backingStore.start();
readToEnd = true;
}

OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
if (!readToEnd) {
// If we have not read to the end of the syncs topic at least once, decline to translate any offsets.
// This prevents emitting stale offsets while initially reading the offset syncs topic.
return OptionalLong.empty();
}
Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
if (offsetSync.isPresent()) {
if (offsetSync.get().upstreamOffset() > upstreamOffset) {
// Offset is too far in the past to translate accurately
return OptionalLong.of(-1L);
}
long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
// If the consumer group is ahead of the offset sync, we can translate the upstream offset only 1
// downstream offset past the offset sync itself. This is because we know that future records must appear
// ahead of the offset sync, but we cannot estimate how many offsets from the upstream topic
// will be written vs dropped. If we overestimate, then we may skip the correct offset and have data loss.
// This also handles consumer groups at the end of a topic whose offsets point past the last valid record.
// This may cause re-reading of records depending on the age of the offset sync.
// s=offset sync pair, ?=record may or may not be replicated, g=consumer group offset, r=re-read record
// source |-s?????r???g-|
// | ______/
// | /
// vv
// target |-sg----r-----|
long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1;
return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
} else {
return OptionalLong.empty();
}
}

// poll and handle records
synchronized void update(Duration pollTimeout) {
try {
consumer.poll(pollTimeout).forEach(this::handleRecord);
} catch (WakeupException e) {
// swallow
}
}

public synchronized void close() {
consumer.wakeup();
Utils.closeQuietly(consumer, "offset sync store consumer");
@Override
public void close() {
Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "backing store for offset syncs");
Utils.closeQuietly(admin, "admin client for offset syncs");
}

protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,16 @@ public void testDownstreamTopicRenaming() {

@Test
public void testCheckpoint() {
long t1UpstreamOffset = 3L;
long t1DownstreamOffset = 4L;
long t2UpstreamOffset = 7L;
long t2DownstreamOffset = 8L;
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
offsetSyncStore.sync(new TopicPartition("topic1", 2), t1UpstreamOffset, t1DownstreamOffset);
offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), t2UpstreamOffset, t2DownstreamOffset);
Optional<Checkpoint> optionalCheckpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
new OffsetAndMetadata(10, null));
assertTrue(optionalCheckpoint1.isPresent());
Expand All @@ -70,7 +75,7 @@ public void testCheckpoint() {
"checkpoint group9 sourcePartition failed");
assertEquals(10, checkpoint1.upstreamOffset(),
"checkpoint group9 upstreamOffset failed");
assertEquals(11, checkpoint1.downstreamOffset(),
assertEquals(t1DownstreamOffset + 1, checkpoint1.downstreamOffset(),
"checkpoint group9 downstreamOffset failed");
assertEquals(123L, sourceRecord1.timestamp().longValue(),
"checkpoint group9 timestamp failed");
Expand All @@ -87,10 +92,27 @@ public void testCheckpoint() {
"checkpoint group11 sourcePartition failed");
assertEquals(12, checkpoint2.upstreamOffset(),
"checkpoint group11 upstreamOffset failed");
assertEquals(13, checkpoint2.downstreamOffset(),
assertEquals(t2DownstreamOffset + 1, checkpoint2.downstreamOffset(),
"checkpoint group11 downstreamOffset failed");
assertEquals(234L, sourceRecord2.timestamp().longValue(),
"checkpoint group11 timestamp failed");
Optional<Checkpoint> optionalCheckpoint3 = mirrorCheckpointTask.checkpoint("group13", new TopicPartition("target2.topic5", 6),
new OffsetAndMetadata(7, null));
assertTrue(optionalCheckpoint3.isPresent());
Checkpoint checkpoint3 = optionalCheckpoint3.get();
SourceRecord sourceRecord3 = mirrorCheckpointTask.checkpointRecord(checkpoint3, 234L);
assertEquals(new TopicPartition("topic5", 6), checkpoint3.topicPartition(),
"checkpoint group13 topic5 failed");
assertEquals("group13", checkpoint3.consumerGroupId(),
"checkpoint group13 consumerGroupId failed");
assertEquals("group13", Checkpoint.unwrapGroup(sourceRecord3.sourcePartition()),
"checkpoint group13 sourcePartition failed");
assertEquals(t2UpstreamOffset, checkpoint3.upstreamOffset(),
"checkpoint group13 upstreamOffset failed");
assertEquals(t2DownstreamOffset, checkpoint3.downstreamOffset(),
"checkpoint group13 downstreamOffset failed");
assertEquals(234L, sourceRecord3.timestamp().longValue(),
"checkpoint group13 timestamp failed");
}

@Test
Expand Down Expand Up @@ -150,6 +172,7 @@ public void testSyncOffset() {
@Test
public void testNoCheckpointForTopicWithoutOffsetSyncs() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 0), 3L, 4L);
Expand All @@ -165,6 +188,7 @@ public void testNoCheckpointForTopicWithoutOffsetSyncs() {
@Test
public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.junit.jupiter.api.Test;

import java.util.OptionalLong;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class OffsetSyncStoreTest {
Expand All @@ -30,7 +32,13 @@ public class OffsetSyncStoreTest {
static class FakeOffsetSyncStore extends OffsetSyncStore {

FakeOffsetSyncStore() {
super(null, null);
super();
}

@Override
public void start() {
// do not call super to avoid NPE without a KafkaBasedLog.
readToEnd = true;
}

void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
Expand All @@ -44,29 +52,57 @@ void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOff

@Test
public void testOffsetTranslation() {
FakeOffsetSyncStore store = new FakeOffsetSyncStore();

store.sync(tp, 100, 200);
assertEquals(250L, store.translateDownstream(tp, 150).getAsLong(),
"Failure in translating downstream offset 250");

// Translate exact offsets
store.sync(tp, 150, 251);
assertEquals(251L, store.translateDownstream(tp, 150).getAsLong(),
"Failure in translating exact downstream offset 251");

// Use old offset (5) prior to any sync -> can't translate
assertEquals(-1, store.translateDownstream(tp, 5).getAsLong(),
"Expected old offset to not translate");

// Downstream offsets reset
store.sync(tp, 200, 10);
assertEquals(10L, store.translateDownstream(tp, 200).getAsLong(),
"Failure in resetting translation of downstream offset");

// Upstream offsets reset
store.sync(tp, 20, 20);
assertEquals(20L, store.translateDownstream(tp, 20).getAsLong(),
"Failure in resetting translation of upstream offset");
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
store.start();

// Emit synced downstream offset without dead-reckoning
store.sync(tp, 100, 200);
assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));

// Translate exact offsets
store.sync(tp, 150, 251);
assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));

// Use old offset (5) prior to any sync -> can't translate
assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));

// Downstream offsets reset
store.sync(tp, 200, 10);
assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));

// Upstream offsets reset
store.sync(tp, 20, 20);
assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
}
}

@Test
public void testNoTranslationIfStoreNotStarted() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
// no offsets exist and store is not started
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));

// read a sync during startup
store.sync(tp, 100, 200);
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));

// After the store is started all offsets are visible
store.start();
assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
}
}

@Test
public void testNoTranslationIfNoOffsetSync() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
store.start();
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
}
}
}

0 comments on commit 69a2817

Please sign in to comment.