-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19390: Call safeForceUnmap() in AbstractIndex.resize() on Linux to prevent stale mmap of index files #19961
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
A label of 'needs-attention' was automatically added to this PR in order to raise the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the PR. Great fix! Left a couple of comments.
@@ -67,19 +70,23 @@ public TimeIndex(File file, long baseOffset, int maxIndexSize, boolean writable) | |||
|
|||
this.lastEntry = lastEntryFromIndexFile(); | |||
|
|||
log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = {}, entries = {}, lastOffset = {}, file position = {}", | |||
file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastEntry.offset, mmap().position()); | |||
inLock(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is lock needed here in the constructor? Ditto for TimeIndex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t think it’s necessary in the constructor, so I’ll remove it.
@@ -95,7 +99,7 @@ public void sanityCheck() { | |||
* the pair (baseOffset, 0) is returned. | |||
*/ | |||
public OffsetPosition lookup(long targetOffset) { | |||
return maybeLock(lock, () -> { | |||
return inLock(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, in Linux, we don't acquire the lock for lookup()
, which is used for fetch requests. Both fetch and produce requests are common. Acquiring the lock in the fetch path reduces read/write concurrency. The reader only truly needs to lock when the underlying mmap changes by resize(). Since resize() is an infrequent event, we could introduce a separate resize lock, which will be held by resize()
and all readers (currently calling maybeLock()
). This will help maintain the current level of concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’d like to ask the following questions:
-
The JavaDoc for java.nio.Buffer says it is not thread-safe. In the case of reads, is it still safe for multiple threads to access the same buffer concurrently?
(See: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/nio/Buffer.html) -
To improve read-operation concurrency, are you suggesting the use of ReentrantReadWriteLock instead of ReentrantLock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Good question. It's true that happens-before guarantee is not guaranteed according to javadoc. However, according to this, MappedByteBuffer is a thin wrapper and provides direct access to buffer cache of the OS. So, once you have written to it, every thread, and every process, will see the update. In addition to the index file, we read from the buffer cache for the log file without synchronization too. The implementation may change in the future, but probably unlikely.
So for now, it's probably better to use a separate resize lock to maintain the current read/write concurrency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the advice!
I have added resizeLock, could you please check?
Please let me know if I have misunderstood anything.
throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " | ||
+ "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " | ||
+ timestamp(mmap(), 0)); | ||
inLock(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch here!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. A couple of more comments. Also, could you run some perf test (produce perf + consumer perf together) to make sure there is no degradation?
@@ -48,6 +51,7 @@ private enum SearchResultType { | |||
private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); | |||
|
|||
protected final ReentrantLock lock = new ReentrantLock(); | |||
protected final ReentrantReadWriteLock resizeLock = new ReentrantReadWriteLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have two locks, could we add comments on how each one is being used? Also, remapLock is probably more accurate?
protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E { | ||
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) | ||
lock.lock(); | ||
protected final <T, E extends Exception> T inResizeLock(Lock lock, StorageAction<T, E> action) throws E { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it's better to make this inResizeReadLock
and avoid passing in lock from the input. For consistency, we could also introduce a method like inLock
for the other lock. This way, we could make both locks private.
Here is the result of perf test:
trunk (7cd99ea)
pr (b035e4c)
pr (af3cf9d)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR and the perf results. The perf numbers look good. A few more comments.
protected final ReentrantLock lock = new ReentrantLock(); | ||
// Serializes all index operations that mutate internal state | ||
private final ReentrantLock lock = new ReentrantLock(); | ||
// Allows concurrent read operations while ensuring exclusive access during index resizing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
during index resizing => if the underlying mmap is changed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. Fixed in 5bc31d2
return LockUtils.inLockThrows(lock, action); | ||
} | ||
|
||
protected final <T, E extends Exception> T inResizeReadLock(StorageAction<T, E> action) throws E { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inResizeReadLock => inRemapReadLock ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in d320291
} | ||
|
||
protected final <T, E extends Exception> T inResizeReadLock(StorageAction<T, E> action) throws E { | ||
remapLock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem that the caller passes in action that actually throws. Could we just use LockUtils.inLock()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've addressed the remaining comments. 7c98c33
Could you please check it again?
/* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */ | ||
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS) | ||
return LockUtils.inLockThrows(lock, () -> { | ||
remapLock.writeLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use LockUtils.inLockThrows(remapLock.writeLock().lock(), () -> {
?
@@ -288,12 +286,9 @@ public void closeHandler() { | |||
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. | |||
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. | |||
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. | |||
lock.lock(); | |||
try { | |||
LockUtils.inLockThrows(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
safeForceUnmap() also changes mmap. So we want to protect it by the remap lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. A few more comments.
protected final ReentrantLock lock = new ReentrantLock(); | ||
// Serializes all index operations that mutate internal state | ||
private final ReentrantLock lock = new ReentrantLock(); | ||
// Allows concurrent read operations while ensuring exclusive access if the underlying file is changed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the underlying file => the underlying mmap
@@ -187,8 +191,7 @@ public void updateParentDir(File parentDir) { | |||
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not. | |||
*/ | |||
public boolean resize(int newSize) throws IOException { | |||
lock.lock(); | |||
try { | |||
return LockUtils.inLockThrows(lock, () -> LockUtils.inLockThrows(remapLock.writeLock(), () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we split this into two lines?
@@ -288,12 +281,9 @@ public void closeHandler() { | |||
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. | |||
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. | |||
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. | |||
lock.lock(); | |||
try { | |||
LockUtils.inLockThrows(remapLock.writeLock(), () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need both locks here since we need to prevent both concurrent writer and reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in ba483b3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. One more comment.
} finally { | ||
lock.unlock(); | ||
} | ||
return LockUtils.inLockThrows(lock, () -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use the utility method in this class? Ditto in a few other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve added inRemapReadWriteLock method. Does this address your comment appropriately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. A couple of more comments.
@@ -236,12 +235,9 @@ public void renameTo(File f) throws IOException { | |||
* Flush the data in the index to disk | |||
*/ | |||
public void flush() { | |||
lock.lock(); | |||
try { | |||
LockUtils.inLock(lock, () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could just be inLock( () -> {
, right? Ditto in a few other places.
@@ -187,8 +191,7 @@ public void updateParentDir(File parentDir) { | |||
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not. | |||
*/ | |||
public boolean resize(int newSize) throws IOException { | |||
lock.lock(); | |||
try { | |||
return inRemapReadWriteLock(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably clearly to do sth like the following. Ditto for closeHandler()
.
inLockThrows(() ->
inRemapWriteLockThrows( () -> {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestions! I've applied the fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. One more comment.
return LockUtils.inLockThrows(remapLock.readLock(), () -> action).get(); | ||
} | ||
|
||
protected final <E extends Exception> void inRemapReadLockThrows(LockUtils.ThrowingRunnable<E> action) throws E { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inRemapReadLockThrows => inRemapWriteLockThrows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the lock helpers.
@Forest0923 : Thanks for the updated PR. The following test failure seems related to this PR. My understanding is that this is caused by low retention size that closes the handler of the index file during async delete. However, by default, the delete is delayed by 60 secs. So, I don't quite understand why this failure is showing up in this test. Do you know what's causing this test to fail now?
|
I'm not exactly sure why I was able to reproduce the issue locally by increasing diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
index 854be39808..06421fd96a 100644
--- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
@@ -67,7 +67,7 @@ class LogConcurrencyTest {
def testUncommittedDataNotConsumed(log: UnifiedLog): Unit = {
val executor = Executors.newFixedThreadPool(2)
try {
- val maxOffset = 5000
+ val maxOffset = 10000
val consumer = new ConsumerTask(log, maxOffset)
val appendTask = new LogAppendTask(log, maxOffset) Applying the following change seems to fix the issue: diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
index 04d68e9a54..89875484f7 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
@@ -212,7 +212,7 @@ public final class OffsetIndex extends AbstractIndex {
* Truncates index to a known number of entries.
*/
private void truncateToEntries(int entries) {
- inLock(() -> {
+ inRemapWriteLockThrows(() -> {
super.truncateToEntries0(entries);
this.lastOffset = lastEntry().offset;
log.debug("Truncated index {} to {} entries; position is now {} and last offset is now {}", |
@Forest0923 : Thanks for the analysis. I was able to reproduce this issue too. It seems that mmap is temporarily set to null by Also, why would adding |
Sorry, my earlier comment about The Supplier's actual action was executed outside the lock. protected final <T> T inRemapReadLock(Supplier<T> action) {
return LockUtils.inLock(remapLock.readLock(), () -> action).get();
} updated: #19961 (comment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. A few more comments. Also, do you know what's causing the following test failure?
2025-07-02T03:49:27.3858133Z
2025-07-02T03:49:27.3860984Z [Error] /home/runner/work/kafka/kafka/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java:132: error: no suitable method found for produceKeyValuesSynchronouslyWithTimestamp(String,List
<KeyValue<Integer,String>>,Properties,long)
2025-07-02T03:49:27.3863357Z > Task :streams:integration-tests:compileTestScala FAILED
2025-07-02T03:49:27.3864394Z javac exited with exit code 1
throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " | ||
+ "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " | ||
+ timestamp(mmap(), 0)); | ||
inLock(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs the remap read lock.
} | ||
|
||
/** | ||
* The last entry in the index | ||
*/ | ||
private OffsetPosition lastEntry() { | ||
lock.lock(); | ||
try { | ||
return inLock(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastEntry() is a reader. So it should only take remap read lock.
Also, there is an existing issue. It seems there is no memory barrier for the instance level field lastOffset. So a reader may not see the latest value. We need to make it volatile.
@@ -259,30 +252,24 @@ private int relativeOffset(ByteBuffer buffer, int n) { | |||
* Read the last entry from the index file. This operation involves disk access. | |||
*/ | |||
private TimestampOffset lastEntryFromIndexFile() { | |||
lock.lock(); | |||
try { | |||
return inLock(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastEntryFromIndexFile() is a reader. So, it needs the remap read lock.
Thanks for the review. I've fixed some lock usages and added volatile for lastOffset. By the way, I couldn’t reproduce the error in EmitOnChangeIntegrationTest locally or find failed github actions. Could it be related to this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the updated PR. The code LGTM. Could you run the same producer/consumer perf test again?
Here is the result of the perf test:
Trunk (a3ed705)
PR (0657c7a)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 : Thanks for the the perf results. LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Forest0923 thanks for this patch, and sorry for delayed review. two small comments remain. Please take a look.
* @param runnable the runnable to be executed within the lock context | ||
* @throws NullPointerException if either {@code lock} or {@code runnable} is null | ||
*/ | ||
public static void inLock(Lock lock, Runnable runnable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excuse me, have we considered replacing inLock
by inLockThrows
? Keeping both varieties seems a bit verbose to me.
} finally { | ||
lock.unlock(); | ||
} | ||
inLockThrows(() -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inLockThrows(() -> inRemapWriteLockThrows(this::safeForceUnmap));
@@ -47,7 +48,10 @@ private enum SearchResultType { | |||
|
|||
private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); | |||
|
|||
protected final ReentrantLock lock = new ReentrantLock(); | |||
// Serializes all index operations that mutate internal state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a minor thing. It would be useful to add a comment on why the reader doesn't need to take this lock.
…390) (apache#20131) This PR performs a refactoring of LockUtils and improves inline comments, as a follow-up to apache#19961. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
https://issues.apache.org/jira/browse/KAFKA-19390
The AbstractIndex.resize() method does not release the old memory map
for both index and time index files. In some cases, Mixed GC may not
run for a long time, which can cause the broker to crash when the
vm.max_map_count limit is reached.
The root cause is that safeForceUnmap() is not being called on Linux
within resize(), so we have changed the code to unmap old mmap on all
operating systems.
The same problem was reported in
KAFKA-7442, but the
PR submitted at that time did not acquire all necessary locks around the
mmap accesses and was closed without fixing the issue.