Skip to content

Commit

Permalink
KAFKA-15351: Ensure log-start-offset not updated to local-log-start-o…
Browse files Browse the repository at this point in the history
…ffset when remote storage enabled (#14301)

When tiered storage is enabled on the topic, and the last-standing-replica is restarted, then the log-start-offset should not reset its offset to first-local-log-segment-base-offset.

Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
  • Loading branch information
kamalcph authored and satishd committed Sep 1, 2023
1 parent b6c5ac0 commit 771f14c
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 37 deletions.
9 changes: 7 additions & 2 deletions core/src/main/scala/kafka/log/LogLoader.scala
Expand Up @@ -78,7 +78,8 @@ class LogLoader(
recoveryPointCheckpoint: Long,
leaderEpochCache: Option[LeaderEpochFileCache],
producerStateManager: ProducerStateManager,
numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int]
numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int],
isRemoteLogEnabled: Boolean = false,
) extends Logging {
logIdent = s"[LogLoader partition=$topicPartition, dir=${dir.getParent}] "

Expand Down Expand Up @@ -180,7 +181,11 @@ class LogLoader(
}

leaderEpochCache.foreach(_.truncateFromEnd(nextOffset))
val newLogStartOffset = math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)
val newLogStartOffset = if (isRemoteLogEnabled) {
logStartOffsetCheckpoint
} else {
math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)
}
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.foreach(_.truncateFromStart(logStartOffsetCheckpoint))

Expand Down
22 changes: 15 additions & 7 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Expand Up @@ -187,12 +187,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}

def remoteLogEnabled(): Boolean = {
// Remote log is enabled only for non-compact and non-internal topics
remoteStorageSystemEnable &&
!(config.compact || Topic.isInternal(topicPartition.topic())
|| TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topicPartition.topic())
|| Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topicPartition.topic())) &&
config.remoteStorageEnable()
UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic())
}

/**
Expand Down Expand Up @@ -1882,6 +1877,17 @@ object UnifiedLog extends Logging {

val UnknownOffset = LocalLog.UnknownOffset

def isRemoteLogEnabled(remoteStorageSystemEnable: Boolean,
config: LogConfig,
topic: String): Boolean = {
// Remote log is enabled only for non-compact and non-internal topics
remoteStorageSystemEnable &&
!(config.compact || Topic.isInternal(topic)
|| TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topic)
|| Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topic)) &&
config.remoteStorageEnable()
}

def apply(dir: File,
config: LogConfig,
logStartOffset: Long,
Expand Down Expand Up @@ -1911,6 +1917,7 @@ object UnifiedLog extends Logging {
s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ")
val producerStateManager = new ProducerStateManager(topicPartition, dir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
val isRemoteLogEnabled = UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic)
val offsets = new LogLoader(
dir,
topicPartition,
Expand All @@ -1924,7 +1931,8 @@ object UnifiedLog extends Logging {
recoveryPoint,
leaderEpochCache,
producerStateManager,
numRemainingSegments
numRemainingSegments,
isRemoteLogEnabled,
).load()
val localLog = new LocalLog(dir, config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, scheduler, time, topicPartition, logDirFailureChannel)
Expand Down
48 changes: 48 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Expand Up @@ -37,6 +37,8 @@ import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochE
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyLong}
import org.mockito.Mockito.{mock, reset, times, verify, when}
Expand Down Expand Up @@ -1753,4 +1755,50 @@ class LogLoaderTest {

log.close()
}

@ParameterizedTest
@CsvSource(Array("false, 5", "true, 0"))
def testLogStartOffsetWhenRemoteStorageIsEnabled(isRemoteLogEnabled: Boolean,
expectedLogStartOffset: Long): Unit = {
val logDirFailureChannel = null
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
val logConfig = LogTestUtils.createLogConfig()
val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
when(stateManager.isEmpty).thenReturn(true)

val log = createLog(logDir, logConfig)
// Create segments: [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
// |---> logStartOffset |---> active segment (empty)
// |---> logEndOffset
for (i <- 0 until 9) {
val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
log.roll()
}
log.maybeIncrementHighWatermark(new LogOffsetMetadata(9L))
log.maybeIncrementLogStartOffset(5L, LogStartOffsetIncrementReason.SegmentDeletion)
log.deleteOldSegments()

val segments = new LogSegments(topicPartition)
log.logSegments.foreach(segment => segments.add(segment))
assertEquals(5, segments.firstSegment.get.baseOffset)

val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "")
val offsets = new LogLoader(
logDir,
topicPartition,
logConfig,
mockTime.scheduler,
mockTime,
logDirFailureChannel,
hadCleanShutdown = true,
segments,
0L,
0L,
leaderEpochCache,
stateManager,
isRemoteLogEnabled = isRemoteLogEnabled
).load()
assertEquals(expectedLogStartOffset, offsets.logStartOffset)
}
}
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.kafka.tiered.storage;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
Expand Down Expand Up @@ -90,15 +92,21 @@ public TieredStorageTestContext(TieredStorageTestHarness harness) {

@SuppressWarnings("deprecation")
private void initClients() {
// rediscover the new bootstrap-server port incase of broker restarts
ListenerName listenerName = harness.listenerName();
Properties commonOverrideProps = new Properties();
commonOverrideProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, harness.bootstrapServers(listenerName));

// Set a producer linger of 60 seconds, in order to optimistically generate batches of
// records with a pre-determined size.
Properties producerOverrideProps = new Properties();
producerOverrideProps.put(LINGER_MS_CONFIG, String.valueOf(TimeUnit.SECONDS.toMillis(60)));
producer = harness.createProducer(ser, ser, producerOverrideProps);
producerOverrideProps.putAll(commonOverrideProps);

consumer = harness.createConsumer(de, de, new Properties(),
producer = harness.createProducer(ser, ser, producerOverrideProps);
consumer = harness.createConsumer(de, de, commonOverrideProps,
JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toList());
admin = harness.createAdminClient(harness.listenerName(), new Properties());
admin = harness.createAdminClient(listenerName, commonOverrideProps);
}

private void initContext() {
Expand Down Expand Up @@ -228,7 +236,11 @@ public Long beginOffset(TopicPartition topicPartition) {

public void bounce(int brokerId) {
harness.killBroker(brokerId);
boolean allBrokersDead = harness.aliveBrokers().isEmpty();
harness.startBroker(brokerId);
if (allBrokersDead) {
reinitClients();
}
initContext();
}

Expand All @@ -238,7 +250,11 @@ public void stop(int brokerId) {
}

public void start(int brokerId) {
boolean allBrokersDead = harness.aliveBrokers().isEmpty();
harness.startBroker(brokerId);
if (allBrokersDead) {
reinitClients();
}
initContext();
}

Expand Down Expand Up @@ -310,5 +326,16 @@ public void printReport(PrintStream output) {

@Override
public void close() throws IOException {
// IntegrationTestHarness closes the clients on tearDown, no need to close them explicitly.
}

private void reinitClients() {
// Broker uses a random port (TestUtils.RandomPort) for the listener. If the initial bootstrap-server config
// becomes invalid, then the clients won't be able to reconnect to the cluster.
// To avoid this, we reinitialize the clients after all the brokers are bounced.
Utils.closeQuietly(producer, "Producer client");
Utils.closeQuietly(consumer, "Consumer client");
Utils.closeQuietly(admin, "Admin client");
initClients();
}
}
Expand Up @@ -24,7 +24,7 @@
import java.util.Map;

/**
* Test Cases (A):
* Test Cases:
* Elementary offloads and fetches from tiered storage.
*/
public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarness {
Expand All @@ -46,14 +46,14 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final Integer p0 = 0;
final Integer partitionCount = 1;
final Integer replicationFactor = 1;
final Integer maxBatchCountPerSegment = 1;
final Integer oneBatchPerSegment = 1;
final Integer twoBatchPerSegment = 2;
final Map<Integer, List<Integer>> replicaAssignment = null;
final boolean enableRemoteLogStorage = true;
final Integer batchSize = 1;

builder
/*
* (A.1) Create a topic which segments contain only one batch and produce three records
* (1) Create a topic which segments contain only one batch and produce three records
* with a batch size of 1.
*
* The topic and broker are configured so that the two rolled segments are picked from
Expand All @@ -68,23 +68,23 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
* Log tA-p0 Log tA-p0
* *-------------------* *-------------------*
* | base offset = 2 | | base offset = 0 |
* | (k3, v3) | | (k1, v1) |
* | (k2, v2) | | (k0, v0) |
* *-------------------* *-------------------*
* *-------------------*
* | base offset = 1 |
* | (k2, v2) |
* | (k1, v1) |
* *-------------------*
*/
.createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, replicaAssignment,
.createTopic(topicA, partitionCount, replicationFactor, oneBatchPerSegment, replicaAssignment,
enableRemoteLogStorage)
.withBatchSize(topicA, p0, batchSize)
.expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k1", "v1"))
.expectSegmentToBeOffloaded(broker, topicA, p0, 1, new KeyValueSpec("k2", "v2"))
.produce(topicA, p0, new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", "v2"),
new KeyValueSpec("k3", "v3"))
.expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker, topicA, p0, 1, new KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
.produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))

/*
* (A.2) Similar scenario as above, but with segments of two records.
* (2) Similar scenario as above, but with segments of two records.
*
* Acceptance:
* -----------
Expand All @@ -95,39 +95,38 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
* Log tB-p0 Log tB-p0
* *-------------------* *-------------------*
* | base offset = 4 | | base offset = 0 |
* | (k5, v5) | | (k1, v1) |
* *-------------------* | (k2, v2) |
* | (k4, v4) | | (k0, v0) |
* *-------------------* | (k1, v1) |
* *-------------------*
* *-------------------*
* | base offset = 2 |
* | (k2, v2) |
* | (k3, v3) |
* | (k4, v4) |
* *-------------------*
*/
.createTopic(topicB, partitionCount, replicationFactor, 2, replicaAssignment,
.createTopic(topicB, partitionCount, replicationFactor, twoBatchPerSegment, replicaAssignment,
enableRemoteLogStorage)
.withBatchSize(topicB, p0, batchSize)
.expectEarliestLocalOffsetInLogDirectory(topicB, p0, 4L)
.expectSegmentToBeOffloaded(broker, topicB, p0, 0,
new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", "v2"))
new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"))
.expectSegmentToBeOffloaded(broker, topicB, p0, 2,
new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"))
.produce(topicB, p0, new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", "v2"),
new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"), new KeyValueSpec("k5", "v5"))
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3"))
.produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"))

/*
* (A.3) Stops and restarts the broker. The purpose of this test is to a) exercise consumption
* (3) Stops and restarts the broker. The purpose of this test is to a) exercise consumption
* from a given offset and b) verify that upon broker start, existing remote log segments
* metadata are loaded by Kafka and these log segments available.
*
* Acceptance:
* -----------
* - For topic A, this offset is defined such that only the second segment is fetched from
* the tiered storage.
* - For topic B, only one segment is present in the tiered storage, as asserted by the
* - For topic B, two segments are present in the tiered storage, as asserted by the
* previous sub-test-case.
*/
// .bounce(broker)
.bounce(broker)
.expectFetchFromTieredStorage(broker, topicA, p0, 1)
.consume(topicA, p0, 1L, 2, 1)
.expectFetchFromTieredStorage(broker, topicB, p0, 2)
Expand Down

0 comments on commit 771f14c

Please sign in to comment.