Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15410: Reassign replica expand, move and shrink integration tests (2/4) #14328

Merged
merged 2 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private Long buildRemoteLogAuxState(TopicPartition topicPartition,

// Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
Partition partition = replicaMgr.getPartitionOrException(topicPartition);
partition.truncateFullyAndStartAt(nextOffset, false);
partition.truncateFullyAndStartAt(nextOffset, false, Option.apply(leaderLogStartOffset));

// Build leader epoch cache.
unifiedLog.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented);
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1646,12 +1646,15 @@ class Partition(val topicPartition: TopicPartition,
*
* @param newOffset The new offset to start the log with
* @param isFuture True iff the truncation should be performed on the future log of this partition
* @param logStartOffsetOpt The log start offset to set for the log. If None, the new offset will be used.
*/
def truncateFullyAndStartAt(newOffset: Long, isFuture: Boolean): Unit = {
def truncateFullyAndStartAt(newOffset: Long,
isFuture: Boolean,
logStartOffsetOpt: Option[Long] = None): Unit = {
kamalcph marked this conversation as resolved.
Show resolved Hide resolved
// The read lock is needed to prevent the follower replica from being truncated while ReplicaAlterDirThread
// is executing maybeReplaceCurrentWithFutureReplica() to replace follower replica with the future replica.
inReadLock(leaderIsrUpdateLock) {
logManager.truncateFullyAndStartAt(topicPartition, newOffset, isFuture = isFuture)
logManager.truncateFullyAndStartAt(topicPartition, newOffset, isFuture = isFuture, logStartOffsetOpt)
}
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,12 @@ class LogManager(logDirs: Seq[File],
* @param topicPartition The partition whose log needs to be truncated
* @param newOffset The new offset to start the log with
* @param isFuture True iff the truncation should be performed on the future log of the specified partition
* @param logStartOffsetOpt The log start offset to set for the log. If None, the new offset will be used.
*/
def truncateFullyAndStartAt(topicPartition: TopicPartition, newOffset: Long, isFuture: Boolean): Unit = {
def truncateFullyAndStartAt(topicPartition: TopicPartition,
newOffset: Long,
isFuture: Boolean,
logStartOffsetOpt: Option[Long] = None): Unit = {
val log = {
if (isFuture)
futureLogs.get(topicPartition)
Expand All @@ -719,7 +723,7 @@ class LogManager(logDirs: Seq[File],
if (!isFuture)
abortAndPauseCleaning(topicPartition)
try {
log.truncateFullyAndStartAt(newOffset)
log.truncateFullyAndStartAt(newOffset, logStartOffsetOpt)
if (!isFuture)
maybeTruncateCleanerCheckpointToActiveSegmentBaseOffset(log, topicPartition)
} finally {
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1745,15 +1745,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* Delete all data in the log and start at the new offset
*
* @param newOffset The new offset to start the log with
* @param logStartOffsetOpt The log start offset to set for the log. If None, the new offset will be used.
*/
def truncateFullyAndStartAt(newOffset: Long): Unit = {
def truncateFullyAndStartAt(newOffset: Long,
logStartOffsetOpt: Option[Long] = None): Unit = {
maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") {
debug(s"Truncate and start at offset $newOffset")
debug(s"Truncate and start at offset $newOffset, logStartOffset: ${logStartOffsetOpt.getOrElse(newOffset)}")
lock synchronized {
localLog.truncateFullyAndStartAt(newOffset)
leaderEpochCache.foreach(_.clearAndFlush())
producerStateManager.truncateFullyAndStartAt(newOffset)
logStartOffset = newOffset
logStartOffset = logStartOffsetOpt.getOrElse(newOffset)
rebuildProducerState(newOffset, producerStateManager)
updateHighWatermark(localLog.logEndOffsetMetadata)
}
Expand Down
48 changes: 44 additions & 4 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,55 @@ class UnifiedLogTest {

@Test
def testTruncateBelowFirstUnstableOffset(): Unit = {
testTruncateBelowFirstUnstableOffset(_.truncateTo)
testTruncateBelowFirstUnstableOffset((log, targetOffset) => log.truncateTo(targetOffset))
}

@Test
def testTruncateFullyAndStartBelowFirstUnstableOffset(): Unit = {
testTruncateBelowFirstUnstableOffset(_.truncateFullyAndStartAt)
testTruncateBelowFirstUnstableOffset((log, targetOffset) => log.truncateFullyAndStartAt(targetOffset))
}

private def testTruncateBelowFirstUnstableOffset(truncateFunc: UnifiedLog => (Long => Unit)): Unit = {
@Test
def testTruncateFullyAndStart(): Unit = {
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
val log = createLog(logDir, logConfig)

val producerId = 17L
val producerEpoch: Short = 10
val sequence = 0

log.appendAsLeader(TestUtils.records(List(
new SimpleRecord("0".getBytes),
new SimpleRecord("1".getBytes),
new SimpleRecord("2".getBytes)
)), leaderEpoch = 0)

log.appendAsLeader(MemoryRecords.withTransactionalRecords(
CompressionType.NONE,
producerId,
producerEpoch,
sequence,
new SimpleRecord("3".getBytes),
new SimpleRecord("4".getBytes)
), leaderEpoch = 0)

assertEquals(Some(3L), log.firstUnstableOffset)

// We close and reopen the log to ensure that the first unstable offset segment
// position will be undefined when we truncate the log.
log.close()

val reopened = createLog(logDir, logConfig)
assertEquals(Optional.of(new LogOffsetMetadata(3L)), reopened.producerStateManager.firstUnstableOffset)

reopened.truncateFullyAndStartAt(2L, Some(1L))
assertEquals(None, reopened.firstUnstableOffset)
assertEquals(java.util.Collections.emptyMap(), reopened.producerStateManager.activeProducers)
assertEquals(1L, reopened.logStartOffset)
assertEquals(2L, reopened.logEndOffset)
}

private def testTruncateBelowFirstUnstableOffset(truncateFunc: (UnifiedLog, Long) => Unit): Unit = {
// Verify that truncation below the first unstable offset correctly
// resets the producer state. Specifically we are testing the case when
// the segment position of the first unstable offset is unknown.
Expand Down Expand Up @@ -215,7 +255,7 @@ class UnifiedLogTest {
val reopened = createLog(logDir, logConfig)
assertEquals(Optional.of(new LogOffsetMetadata(3L)), reopened.producerStateManager.firstUnstableOffset)

truncateFunc(reopened)(0L)
truncateFunc(reopened, 0L)
assertEquals(None, reopened.firstUnstableOffset)
assertEquals(java.util.Collections.emptyMap(), reopened.producerStateManager.activeProducers)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;

import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;

public abstract class BaseReassignReplicaTest extends TieredStorageTestHarness {
protected final Integer broker0 = 0;
protected final Integer broker1 = 1;

/**
* Cluster of two brokers
* @return number of brokers in the cluster
*/
@Override
public int brokerCount() {
return 2;
}

/**
* Number of partitions in the '__remote_log_metadata' topic
* @return number of partitions in the '__remote_log_metadata' topic
*/
@Override
public int numRemoteLogMetadataPartitions() {
return 2;
}

@Override
protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final String topicA = "topicA";
final String topicB = "topicB";
final Integer p0 = 0;
final Integer partitionCount = 5;
final Integer replicationFactor = 2;
final Integer maxBatchCountPerSegment = 1;
final Map<Integer, List<Integer>> replicaAssignment = null;
final boolean enableRemoteLogStorage = true;
final List<Integer> metadataPartitions = new ArrayList<>();
for (int i = 0; i < numRemoteLogMetadataPartitions(); i++) {
metadataPartitions.add(i);
}

builder
// create topicA with 5 partitions, 2 RF and ensure that the user-topic-partitions are mapped to
// metadata partitions
.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment,
replicaAssignment, enableRemoteLogStorage)
.expectUserTopicMappedToMetadataPartitions(topicA, metadataPartitions)
// create topicB with 1 partition and 1 RF
.createTopic(topicB, 1, 1, maxBatchCountPerSegment,
mkMap(mkEntry(p0, Collections.singletonList(broker0))), enableRemoteLogStorage)
// send records to partition 0
.expectSegmentToBeOffloaded(broker0, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker0, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
.produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
// The newly created replica gets mapped to one of the metadata partition which is being actively
// consumed by both the brokers
.reassignReplica(topicB, p0, replicaIds())
.expectLeader(topicB, p0, broker1, true)
// produce some more events and verify the earliest local offset
.expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
kamalcph marked this conversation as resolved.
Show resolved Hide resolved
.produce(topicB, p0, new KeyValueSpec("k3", "v3"))
// consume from the beginning of the topic to read data from local and remote storage
.expectFetchFromTieredStorage(broker1, topicB, p0, 3)
.consume(topicB, p0, 0L, 4, 3);
}

/**
* Replicas of the topic
* @return the replica-ids of the topic
*/
protected abstract List<Integer> replicaIds();
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
new KeyValueSpec("k2", "v2"))

// produce some more events to partition 0 and expect the segments to be offloaded
// NOTE: Support needs to be added to capture the offloaded segment event for already sent message (k2, v2)
// KAFKA-15431: Support needs to be added to capture the offloaded segment event for already sent
// message (k2, v2)
// .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new KeyValueSpec("k2", "v2"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new KeyValueSpec("k3", "v3"))
.expectSegmentToBeOffloaded(broker0, topicA, p0, 4, new KeyValueSpec("k4", "v4"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;

import java.util.Arrays;
import java.util.List;

public final class ReassignReplicaExpandTest extends BaseReassignReplicaTest {

/**
* Expand the replication factor of the topic by changing the replica list from 0 to 0, 1
* @return the replica-ids of the topic
*/
@Override
protected List<Integer> replicaIds() {
return Arrays.asList(broker0, broker1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tiered.storage.integration;

import java.util.Collections;
import java.util.List;

public final class ReassignReplicaMoveTest extends BaseReassignReplicaTest {

/**
* Move the replica of the topic from broker0 to broker1
* @return the replica-ids of the topic
*/
@Override
protected List<Integer> replicaIds() {
kamalcph marked this conversation as resolved.
Show resolved Hide resolved
return Collections.singletonList(broker1);
}
}