Skip to content

Commit

Permalink
KAFKA-14993: Improve TransactionIndex instance handling while copying…
Browse files Browse the repository at this point in the history
… to and fetching from RSM.
  • Loading branch information
abhijeetk88 committed Sep 9, 2023
1 parent b24ccd6 commit 07378c3
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 31 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,7 @@ private void collectAbortedTransactions(long startOffset,
// Search in remote segments first.
Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = Optional.of(segmentMetadata);
while (nextSegmentMetadataOpt.isPresent()) {
Optional<TransactionIndex> txnIndexOpt = nextSegmentMetadataOpt.map(metadata -> indexCache.getIndexEntry(metadata).txnIndex());
Optional<TransactionIndex> txnIndexOpt = indexCache.getIndexEntry(nextSegmentMetadataOpt.get()).txnIndexOpt();
if (txnIndexOpt.isPresent()) {
TxnIndexSearchResult searchResult = txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
accumulator.accept(searchResult.abortedTransactions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager}
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
Expand All @@ -35,7 +35,7 @@ import org.slf4j.{Logger, LoggerFactory}
import java.io.{File, FileInputStream, IOException}
import java.nio.file.Files
import java.util
import java.util.Collections
import java.util.{Collections, Optional}
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import scala.collection.mutable

Expand Down Expand Up @@ -105,7 +105,7 @@ class RemoteIndexCacheTest {
def testIndexFileNameAndLocationOnDisk(): Unit = {
val entry = cache.getIndexEntry(rlsMetadata)
val offsetIndexFile = entry.offsetIndex.file().toPath
val txnIndexFile = entry.txnIndex.file().toPath
val txnIndexFile = entry.txnIndexOpt.get().file().toPath
val timeIndexFile = entry.timeIndex.file().toPath

val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata)
Expand Down Expand Up @@ -143,6 +143,28 @@ class RemoteIndexCacheTest {
verifyNoInteractions(rsm)
}

@Test
def testFetchIndexForMissingTransactionIndex(): Unit = {
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => throw new RemoteResourceNotFoundException("txn index not found")
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})

val entry = cache.getIndexEntry(rlsMetadata)
assertFalse(entry.txnIndexOpt.isPresent)
}

@Test
def testPositionForNonExistingIndexFromRemoteStorage(): Unit = {
val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex
Expand Down Expand Up @@ -273,7 +295,7 @@ class RemoteIndexCacheTest {
// verify that index(s) rename is only called 1 time
verify(cacheEntry.timeIndex).renameTo(any(classOf[File]))
verify(cacheEntry.offsetIndex).renameTo(any(classOf[File]))
verify(cacheEntry.txnIndex).renameTo(any(classOf[File]))
verify(cacheEntry.txnIndexOpt.get()).renameTo(any(classOf[File]))

// verify no index files on disk
assertFalse(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
Expand Down Expand Up @@ -331,12 +353,12 @@ class RemoteIndexCacheTest {
verify(spyEntry).close()

// close for all index entries must be invoked
verify(spyEntry.txnIndex).close()
verify(spyEntry.txnIndexOpt.get()).close()
verify(spyEntry.offsetIndex).close()
verify(spyEntry.timeIndex).close()

// index files must not be deleted
verify(spyEntry.txnIndex, times(0)).deleteIfExists()
verify(spyEntry.txnIndexOpt.get(), times(0)).deleteIfExists()
verify(spyEntry.offsetIndex, times(0)).deleteIfExists()
verify(spyEntry.timeIndex, times(0)).deleteIfExists()

Expand Down Expand Up @@ -506,7 +528,7 @@ class RemoteIndexCacheTest {
val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata))
val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata))
val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata))
spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex))
spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, Optional.of(txIndex)))
}

private def assertAtLeastOnePresent(cache: RemoteIndexCache, uuids: Uuid*): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
* Returns the index for the respective log segment of {@link RemoteLogSegmentMetadata}.
* <p>
* Note: The transaction index may not exist because of no transactional records.
* In this case, it should still return an InputStream with empty content, instead of returning {@code null}.
* In this case, it should throw a RemoteResourceNotFoundException, instead of returning {@code null}.
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @param indexType type of the index to be fetched for the segment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
Expand All @@ -46,6 +47,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -276,7 +278,7 @@ private void init() throws IOException {
TransactionIndex txnIndex = new TransactionIndex(offset, txnIndexFile);
txnIndex.sanityCheck();

Entry entry = new Entry(offsetIndex, timeIndex, txnIndex);
Entry entry = new Entry(offsetIndex, timeIndex, Optional.ofNullable(txnIndex));
internalCache.put(uuid, entry);
} else {
// Delete all of them if any one of those indexes is not available for a specific segment id
Expand Down Expand Up @@ -315,6 +317,7 @@ private <T> T loadIndexFile(File file, RemoteLogSegmentMetadata remoteLogSegment
if (index == null) {
File tmpIndexFile = new File(indexFile.getParentFile(), indexFile.getName() + RemoteIndexCache.TMP_FILE_SUFFIX);
try (InputStream inputStream = fetchRemoteIndex.apply(remoteLogSegmentMetadata)) {
if (inputStream == null) return null;
Files.copy(inputStream, tmpIndexFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false);
Expand Down Expand Up @@ -382,6 +385,8 @@ private RemoteIndexCache.Entry createCacheEntry(RemoteLogSegmentMetadata remoteL
TransactionIndex txnIndex = loadIndexFile(txnIndexFile, remoteLogSegmentMetadata, rlsMetadata -> {
try {
return remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TRANSACTION);
} catch (RemoteResourceNotFoundException e) {
return null;
} catch (RemoteStorageException e) {
throw new KafkaException(e);
}
Expand All @@ -395,7 +400,7 @@ private RemoteIndexCache.Entry createCacheEntry(RemoteLogSegmentMetadata remoteL
}
});

return new Entry(offsetIndex, timeIndex, txnIndex);
return new Entry(offsetIndex, timeIndex, Optional.ofNullable(txnIndex));
} catch (IOException e) {
throw new KafkaException(e);
}
Expand Down Expand Up @@ -451,7 +456,7 @@ public static class Entry implements AutoCloseable {

private final OffsetIndex offsetIndex;
private final TimeIndex timeIndex;
private final TransactionIndex txnIndex;
private final Optional<TransactionIndex> txnIndexOpt;

// This lock is used to synchronize cleanup methods and read methods. This ensures that cleanup (which changes the
// underlying files of the index) isn't performed while a read is in-progress for the entry. This is required in
Expand All @@ -463,10 +468,10 @@ public static class Entry implements AutoCloseable {

private boolean markedForCleanup = false;

public Entry(OffsetIndex offsetIndex, TimeIndex timeIndex, TransactionIndex txnIndex) {
public Entry(OffsetIndex offsetIndex, TimeIndex timeIndex, Optional<TransactionIndex> txnIndexOpt) {
this.offsetIndex = offsetIndex;
this.timeIndex = timeIndex;
this.txnIndex = txnIndex;
this.txnIndexOpt = txnIndexOpt;
}

// Visible for testing
Expand All @@ -480,8 +485,8 @@ public TimeIndex timeIndex() {
}

// Visible for testing
public TransactionIndex txnIndex() {
return txnIndex;
public Optional<TransactionIndex> txnIndexOpt() {
return txnIndexOpt;
}

// Visible for testing
Expand Down Expand Up @@ -523,7 +528,9 @@ public void markForCleanup() throws IOException {
markedForCleanup = true;
offsetIndex.renameTo(new File(Utils.replaceSuffix(offsetIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
timeIndex.renameTo(new File(Utils.replaceSuffix(timeIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
if (txnIndexOpt.isPresent()) {
txnIndexOpt.get().renameTo(new File(Utils.replaceSuffix(txnIndexOpt.get().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
}
}
} finally {
lock.writeLock().unlock();
Expand All @@ -545,7 +552,9 @@ public void cleanup() throws IOException {
timeIndex.deleteIfExists();
return null;
}, () -> {
txnIndex.deleteIfExists();
if (txnIndexOpt.isPresent()) {
txnIndexOpt.get().deleteIfExists();
}
return null;
});

Expand All @@ -562,7 +571,7 @@ public void close() {
try {
Utils.closeQuietly(offsetIndex, "OffsetIndex");
Utils.closeQuietly(timeIndex, "TimeIndex");
Utils.closeQuietly(txnIndex, "TransactionIndex");
txnIndexOpt.ifPresent(txnIndex -> Utils.closeQuietly(txnIndex, "TransactionIndex"));
} finally {
lock.writeLock().unlock();
}
Expand All @@ -573,8 +582,7 @@ public String toString() {
return "Entry{" +
"offsetIndex=" + offsetIndex.file().getName() +
", timeIndex=" + timeIndex.file().getName() +
", txnIndex=" + txnIndex.file().getName() +
'}';
", txnIndex=" + txnIndexOpt.map(index -> index.file().getName()) + '}';
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -381,8 +380,14 @@ public InputStream fetchIndex(RemoteLogSegmentMetadata metadata, IndexType index
final RemoteLogSegmentFileset fileset = openFileset(storageDirectory, metadata);

File file = fileset.getFile(fileType);
final InputStream inputStream = (fileType.isOptional() && !file.exists()) ?
new ByteArrayInputStream(new byte[0]) : newInputStream(file.toPath(), READ);

final InputStream inputStream;
if (fileType.isOptional() && !file.exists()) {
throw new RemoteResourceNotFoundException("Index file for type: " + indexType +
" not found for segment " + metadata.remoteLogSegmentId());
} else {
inputStream = newInputStream(file.toPath(), READ);
}

storageListeners.onStorageEvent(eventBuilder.withFileset(fileset).build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.LEADER_EPOCH;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.OFFSET;
Expand Down Expand Up @@ -347,12 +346,7 @@ public void fetchThrowsIfDataDoesNotExist() {
assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, TIMESTAMP));
assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, LEADER_EPOCH));
assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, PRODUCER_SNAPSHOT));

try {
assertArrayEquals(new byte[0], remoteStorageVerifier.readFully(tieredStorage.fetchIndex(metadata, TRANSACTION)));
} catch (Exception ex) {
fail("Shouldn't have thrown an exception when optional file doesn't exists in the remote store");
}
assertThrows(RemoteResourceNotFoundException.class, () -> tieredStorage.fetchIndex(metadata, TRANSACTION));
}

@Test
Expand Down

0 comments on commit 07378c3

Please sign in to comment.