Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
satishd committed Mar 24, 2023
1 parent 082bb61 commit 3361ee3
Showing 1 changed file with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.storage.internals.log;


import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CorruptRecordException;
Expand Down Expand Up @@ -84,7 +83,7 @@ public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager,
this.remoteStorageManager = remoteStorageManager;
cacheDir = new File(logDir, DIR_NAME);

entries = new LinkedHashMap<Uuid, RemoteIndexCache.Entry>(maxSize, 0.75f, true) {
entries = new LinkedHashMap<Uuid, RemoteIndexCache.Entry>(maxSize / 2, 0.75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<Uuid, RemoteIndexCache.Entry> eldest) {
if (this.size() > maxSize) {
Expand Down Expand Up @@ -243,7 +242,7 @@ public Entry getIndexEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
try {
return remoteStorageManager.fetchIndex(rlsMetadata, IndexType.OFFSET);
} catch (RemoteStorageException e) {
throw new RuntimeException(e);
throw new KafkaException(e);
}
}, file -> {
try {
Expand Down Expand Up @@ -278,14 +277,13 @@ public Entry getIndexEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
throw new KafkaException(e);
}
}, file -> {
TransactionIndex index = null;
try {
index = new TransactionIndex(startOffset, file);
TransactionIndex index = new TransactionIndex(startOffset, file);
index.sanityCheck();
return index;
} catch (IOException e) {
throw new KafkaException(e);
}
index.sanityCheck();
return index;
});

return new Entry(offsetIndex, timeIndex, txnIndex);
Expand All @@ -296,7 +294,7 @@ public Entry getIndexEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
}
}

public int lookupOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) throws IOException {
public int lookupOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
return getIndexEntry(remoteLogSegmentMetadata).lookupOffset(offset).position;
}

Expand Down Expand Up @@ -337,7 +335,7 @@ public Entry(OffsetIndex offsetIndex, TimeIndex timeIndex, TransactionIndex txnI
private boolean markedForCleanup = false;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

public OffsetPosition lookupOffset(long targetOffset) throws IOException {
public OffsetPosition lookupOffset(long targetOffset) {
lock.readLock().lock();
try {
if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup");
Expand All @@ -364,8 +362,8 @@ public void markForCleanup() throws IOException {
try {
if (!markedForCleanup) {
markedForCleanup = true;

offsetIndex.renameTo(new File(Utils.replaceSuffix(offsetIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
timeIndex.renameTo(new File(Utils.replaceSuffix(timeIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
txnIndex.renameTo(new File(Utils.replaceSuffix(txnIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
}
} finally {
Expand All @@ -388,7 +386,7 @@ public void cleanup() throws IOException {
return null;
}));
} catch (Exception e) {
throw new KafkaException(e);
throw new IOException(e);
}
}

Expand Down

0 comments on commit 3361ee3

Please sign in to comment.