Skip to content

KAFKA-19390: Call safeForceUnmap() in AbstractIndex.resize() on Linux to prevent stale mmap of index files #19961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jul 8, 2025

Conversation

Forest0923
Copy link
Contributor

@Forest0923 Forest0923 commented Jun 13, 2025

https://issues.apache.org/jira/browse/KAFKA-19390

The AbstractIndex.resize() method does not release the old memory map
for both index and time index files. In some cases, Mixed GC may not
run for a long time, which can cause the broker to crash when the
vm.max_map_count limit is reached.

The root cause is that safeForceUnmap() is not being called on Linux
within resize(), so we have changed the code to unmap old mmap on all
operating systems.

The same problem was reported in
KAFKA-7442, but the
PR submitted at that time did not acquire all necessary locks around the
mmap accesses and was closed without fixing the issue.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker storage Pull requests that target the storage module labels Jun 13, 2025
Copy link

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Forest0923 : Thanks for the PR. Great fix! Left a couple of comments.

@@ -67,19 +70,23 @@ public TimeIndex(File file, long baseOffset, int maxIndexSize, boolean writable)

this.lastEntry = lastEntryFromIndexFile();

log.debug("Loaded index file {} with maxEntries = {}, maxIndexSize = {}, entries = {}, lastOffset = {}, file position = {}",
file.getAbsolutePath(), maxEntries(), maxIndexSize, entries(), lastEntry.offset, mmap().position());
inLock(lock, () -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is lock needed here in the constructor? Ditto for TimeIndex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think it’s necessary in the constructor, so I’ll remove it.

@@ -95,7 +99,7 @@ public void sanityCheck() {
* the pair (baseOffset, 0) is returned.
*/
public OffsetPosition lookup(long targetOffset) {
return maybeLock(lock, () -> {
return inLock(lock, () -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, in Linux, we don't acquire the lock for lookup(), which is used for fetch requests. Both fetch and produce requests are common. Acquiring the lock in the fetch path reduces read/write concurrency. The reader only truly needs to lock when the underlying mmap changes by resize(). Since resize() is an infrequent event, we could introduce a separate resize lock, which will be held by resize() and all readers (currently calling maybeLock()). This will help maintain the current level of concurrency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d like to ask the following questions:

  1. The JavaDoc for java.nio.Buffer says it is not thread-safe. In the case of reads, is it still safe for multiple threads to access the same buffer concurrently?
    (See: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/nio/Buffer.html)

  2. To improve read-operation concurrency, are you suggesting the use of ReentrantReadWriteLock instead of ReentrantLock?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Forest0923 : Good question. It's true that happens-before guarantee is not guaranteed according to javadoc. However, according to this, MappedByteBuffer is a thin wrapper and provides direct access to buffer cache of the OS. So, once you have written to it, every thread, and every process, will see the update. In addition to the index file, we read from the buffer cache for the log file without synchronization too. The implementation may change in the future, but probably unlikely.

So for now, it's probably better to use a separate resize lock to maintain the current read/write concurrency?

Copy link
Contributor Author

@Forest0923 Forest0923 Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the advice!
I have added resizeLock, could you please check?

Please let me know if I have misunderstood anything.

throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has "
+ "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp "
+ timestamp(mmap(), 0));
inLock(lock, () -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch here!

@github-actions github-actions bot removed the triage PRs from the community label Jun 25, 2025
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Forest0923 : Thanks for the updated PR. A couple of more comments. Also, could you run some perf test (produce perf + consumer perf together) to make sure there is no degradation?

@@ -48,6 +51,7 @@ private enum SearchResultType {
private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);

protected final ReentrantLock lock = new ReentrantLock();
protected final ReentrantReadWriteLock resizeLock = new ReentrantReadWriteLock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have two locks, could we add comments on how each one is being used? Also, remapLock is probably more accurate?

protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action) throws E {
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
lock.lock();
protected final <T, E extends Exception> T inResizeLock(Lock lock, StorageAction<T, E> action) throws E {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's better to make this inResizeReadLock and avoid passing in lock from the input. For consistency, we could also introduce a method like inLock for the other lock. This way, we could make both locks private.

@Forest0923
Copy link
Contributor Author

Forest0923 commented Jun 27, 2025

Here is the result of perf test:

  • OS: Rocky Linux release 9.5 (linux 5.14.0)
  • openjdk version : 17.0.15
Metric Trunk (7cd99ea) PR (b035e4c) PR (af3cf9d)
Producer throughput
(records/sec)
143,548.6 142,388.5 143,016.5
Producer throughput
(MB/sec)
140.18 139.05 139.66
Producer average latency
(ms)
175.20 171.25 137.93
Producer max latency
(ms)
1,505.00 1,340.00 1,413.00
Producer 50th percentile latency
(ms)
192 185 139
Producer 95th percentile latency
(ms)
230 233 226
Producer 99th percentile latency
(ms)
380 427 547
Producer 99.9th percentile latency
(ms)
1,190 1,214 1,359
Consumer throughput
(messages/sec)
140,420.2 141,071.6 141,554.0
Consumer throughput
(MB/sec)
137.13 137.77 138.24
trunk (7cd99ea)
$ bin/kafka-producer-perf-test.sh --topic sandbox --num-records 25000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092
123001 records sent, 24600.2 records/sec (24.02 MB/sec), 1021.5 ms avg latency, 1505.0 ms max latency.
520665 records sent, 104133.0 records/sec (101.69 MB/sec), 308.8 ms avg latency, 797.0 ms max latency.
677145 records sent, 135429.0 records/sec (132.25 MB/sec), 219.0 ms avg latency, 407.0 ms max latency.
737490 records sent, 147498.0 records/sec (144.04 MB/sec), 196.1 ms avg latency, 231.0 ms max latency.
748560 records sent, 149712.0 records/sec (146.20 MB/sec), 177.2 ms avg latency, 233.0 ms max latency.
792915 records sent, 158583.0 records/sec (154.87 MB/sec), 103.6 ms avg latency, 212.0 ms max latency.
723735 records sent, 144747.0 records/sec (141.35 MB/sec), 101.5 ms avg latency, 183.0 ms max latency.
773235 records sent, 154647.0 records/sec (151.02 MB/sec), 76.4 ms avg latency, 150.0 ms max latency.
765450 records sent, 153090.0 records/sec (149.50 MB/sec), 123.9 ms avg latency, 212.0 ms max latency.
756675 records sent, 151335.0 records/sec (147.79 MB/sec), 188.3 ms avg latency, 225.0 ms max latency.
760305 records sent, 152061.0 records/sec (148.50 MB/sec), 195.7 ms avg latency, 223.0 ms max latency.
751275 records sent, 150255.0 records/sec (146.73 MB/sec), 165.1 ms avg latency, 210.0 ms max latency.
780525 records sent, 156105.0 records/sec (152.45 MB/sec), 113.8 ms avg latency, 237.0 ms max latency.
755385 records sent, 151077.0 records/sec (147.54 MB/sec), 60.0 ms avg latency, 165.0 ms max latency.
765375 records sent, 153075.0 records/sec (149.49 MB/sec), 103.4 ms avg latency, 217.0 ms max latency.
724890 records sent, 144949.0 records/sec (141.55 MB/sec), 205.4 ms avg latency, 248.0 ms max latency.
694965 records sent, 138993.0 records/sec (135.74 MB/sec), 209.7 ms avg latency, 251.0 ms max latency.
750165 records sent, 150033.0 records/sec (146.52 MB/sec), 184.6 ms avg latency, 225.0 ms max latency.
712050 records sent, 142410.0 records/sec (139.07 MB/sec), 194.2 ms avg latency, 296.0 ms max latency.
717405 records sent, 143481.0 records/sec (140.12 MB/sec), 211.3 ms avg latency, 253.0 ms max latency.
721352 records sent, 144270.4 records/sec (140.89 MB/sec), 210.4 ms avg latency, 258.0 ms max latency.
706903 records sent, 141380.6 records/sec (138.07 MB/sec), 211.1 ms avg latency, 242.0 ms max latency.
735135 records sent, 147027.0 records/sec (143.58 MB/sec), 187.2 ms avg latency, 244.0 ms max latency.
741525 records sent, 148305.0 records/sec (144.83 MB/sec), 198.4 ms avg latency, 259.0 ms max latency.
766440 records sent, 153288.0 records/sec (149.70 MB/sec), 122.5 ms avg latency, 189.0 ms max latency.
760755 records sent, 152151.0 records/sec (148.58 MB/sec), 156.4 ms avg latency, 233.0 ms max latency.
734430 records sent, 146886.0 records/sec (143.44 MB/sec), 188.5 ms avg latency, 244.0 ms max latency.
743400 records sent, 148680.0 records/sec (145.20 MB/sec), 195.1 ms avg latency, 254.0 ms max latency.
745545 records sent, 149109.0 records/sec (145.61 MB/sec), 175.8 ms avg latency, 218.0 ms max latency.
735855 records sent, 147171.0 records/sec (143.72 MB/sec), 179.6 ms avg latency, 245.0 ms max latency.
727380 records sent, 145476.0 records/sec (142.07 MB/sec), 201.9 ms avg latency, 254.0 ms max latency.
724200 records sent, 144840.0 records/sec (141.45 MB/sec), 203.5 ms avg latency, 260.0 ms max latency.
749055 records sent, 149811.0 records/sec (146.30 MB/sec), 199.1 ms avg latency, 222.0 ms max latency.
771150 records sent, 154230.0 records/sec (150.62 MB/sec), 178.8 ms avg latency, 232.0 ms max latency.
25000000 records sent, 143548.6 records/sec (140.18 MB/sec), 175.20 ms avg latency, 1505.00 ms max latency, 192 ms 50th, 230 ms 95th, 380 ms 99th, 1190 ms 99.9th.

$ bin/kafka-consumer-perf-test.sh --topic sandbox --messages 25000000 --bootstrap-server localhost:9092 --timeout 500000000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2025-06-27 12:43:59:905, 2025-06-27 12:46:57:942, 24414.0625, 137.1292, 25000000, 140420.2497, 3424, 174613, 139.8181, 143173.7614
pr (b035e4c)
$ bin/kafka-producer-perf-test.sh --topic sandbox --num-records 25000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092
123886 records sent, 24777.2 records/sec (24.20 MB/sec), 1026.2 ms avg latency, 1340.0 ms max latency.
463395 records sent, 92679.0 records/sec (90.51 MB/sec), 342.5 ms avg latency, 784.0 ms max latency.
670455 records sent, 134091.0 records/sec (130.95 MB/sec), 223.0 ms avg latency, 304.0 ms max latency.
735030 records sent, 147006.0 records/sec (143.56 MB/sec), 197.5 ms avg latency, 251.0 ms max latency.
735559 records sent, 147111.8 records/sec (143.66 MB/sec), 170.4 ms avg latency, 237.0 ms max latency.
725546 records sent, 145109.2 records/sec (141.71 MB/sec), 200.9 ms avg latency, 290.0 ms max latency.
755025 records sent, 151005.0 records/sec (147.47 MB/sec), 155.4 ms avg latency, 242.0 ms max latency.
764055 records sent, 152811.0 records/sec (149.23 MB/sec), 68.0 ms avg latency, 152.0 ms max latency.
740955 records sent, 148191.0 records/sec (144.72 MB/sec), 136.4 ms avg latency, 216.0 ms max latency.
756360 records sent, 151272.0 records/sec (147.73 MB/sec), 156.4 ms avg latency, 215.0 ms max latency.
752130 records sent, 150426.0 records/sec (146.90 MB/sec), 185.4 ms avg latency, 235.0 ms max latency.
744810 records sent, 148962.0 records/sec (145.47 MB/sec), 178.6 ms avg latency, 266.0 ms max latency.
712755 records sent, 142551.0 records/sec (139.21 MB/sec), 89.4 ms avg latency, 222.0 ms max latency.
716835 records sent, 143367.0 records/sec (140.01 MB/sec), 206.6 ms avg latency, 240.0 ms max latency.
752910 records sent, 150582.0 records/sec (147.05 MB/sec), 167.7 ms avg latency, 243.0 ms max latency.
780090 records sent, 156018.0 records/sec (152.36 MB/sec), 158.5 ms avg latency, 259.0 ms max latency.
739125 records sent, 147825.0 records/sec (144.36 MB/sec), 96.2 ms avg latency, 220.0 ms max latency.
715530 records sent, 143106.0 records/sec (139.75 MB/sec), 141.1 ms avg latency, 224.0 ms max latency.
727080 records sent, 145416.0 records/sec (142.01 MB/sec), 196.2 ms avg latency, 246.0 ms max latency.
781545 records sent, 156309.0 records/sec (152.65 MB/sec), 82.0 ms avg latency, 230.0 ms max latency.
708045 records sent, 141609.0 records/sec (138.29 MB/sec), 192.7 ms avg latency, 242.0 ms max latency.
737985 records sent, 147597.0 records/sec (144.14 MB/sec), 195.4 ms avg latency, 231.0 ms max latency.
746805 records sent, 149361.0 records/sec (145.86 MB/sec), 177.8 ms avg latency, 242.0 ms max latency.
785925 records sent, 157185.0 records/sec (153.50 MB/sec), 73.2 ms avg latency, 185.0 ms max latency.
754560 records sent, 150912.0 records/sec (147.38 MB/sec), 55.3 ms avg latency, 215.0 ms max latency.
767535 records sent, 153507.0 records/sec (149.91 MB/sec), 169.1 ms avg latency, 259.0 ms max latency.
721935 records sent, 144387.0 records/sec (141.00 MB/sec), 199.8 ms avg latency, 243.0 ms max latency.
753615 records sent, 150723.0 records/sec (147.19 MB/sec), 161.9 ms avg latency, 223.0 ms max latency.
725565 records sent, 145113.0 records/sec (141.71 MB/sec), 188.2 ms avg latency, 233.0 ms max latency.
706680 records sent, 141336.0 records/sec (138.02 MB/sec), 206.6 ms avg latency, 268.0 ms max latency.
701055 records sent, 140211.0 records/sec (136.92 MB/sec), 213.3 ms avg latency, 265.0 ms max latency.
759360 records sent, 151872.0 records/sec (148.31 MB/sec), 195.0 ms avg latency, 233.0 ms max latency.
744345 records sent, 148869.0 records/sec (145.38 MB/sec), 177.8 ms avg latency, 259.0 ms max latency.
707447 records sent, 141489.4 records/sec (138.17 MB/sec), 199.7 ms avg latency, 232.0 ms max latency.
710563 records sent, 142112.6 records/sec (138.78 MB/sec), 209.5 ms avg latency, 255.0 ms max latency.
25000000 records sent, 142388.5 records/sec (139.05 MB/sec), 171.25 ms avg latency, 1340.00 ms max latency, 185 ms 50th, 233 ms 95th, 427 ms 99th, 1214 ms 99.9th.

$ bin/kafka-consumer-perf-test.sh --topic sandbox --messages 25000000 --bootstrap-server localhost:9092 --timeout 50000000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2025-06-27 11:25:00:198, 2025-06-27 11:27:57:413, 24414.0625, 137.7652, 25000000, 141071.5797, 3432, 173783, 140.4859, 143857.5695
pr (af3cf9d)
$ bin/kafka-producer-perf-test.sh --topic sandbox --num-records 25000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092
126301 records sent, 25260.2 records/sec (24.67 MB/sec), 999.7 ms avg latency, 1413.0 ms max latency.
450255 records sent, 90051.0 records/sec (87.94 MB/sec), 350.4 ms avg latency, 767.0 ms max latency.
703215 records sent, 140643.0 records/sec (137.35 MB/sec), 212.2 ms avg latency, 270.0 ms max latency.
735270 records sent, 147054.0 records/sec (143.61 MB/sec), 204.7 ms avg latency, 237.0 ms max latency.
744900 records sent, 148980.0 records/sec (145.49 MB/sec), 202.1 ms avg latency, 226.0 ms max latency.
698115 records sent, 139623.0 records/sec (136.35 MB/sec), 179.4 ms avg latency, 260.0 ms max latency.
755805 records sent, 151161.0 records/sec (147.62 MB/sec), 175.7 ms avg latency, 236.0 ms max latency.
728700 records sent, 145740.0 records/sec (142.32 MB/sec), 179.3 ms avg latency, 216.0 ms max latency.
744780 records sent, 148956.0 records/sec (145.46 MB/sec), 168.3 ms avg latency, 259.0 ms max latency.
787335 records sent, 157467.0 records/sec (153.78 MB/sec), 73.4 ms avg latency, 169.0 ms max latency.
728265 records sent, 145653.0 records/sec (142.24 MB/sec), 144.0 ms avg latency, 222.0 ms max latency.
729660 records sent, 145932.0 records/sec (142.51 MB/sec), 73.8 ms avg latency, 198.0 ms max latency.
732510 records sent, 146502.0 records/sec (143.07 MB/sec), 117.2 ms avg latency, 216.0 ms max latency.
712125 records sent, 142425.0 records/sec (139.09 MB/sec), 162.2 ms avg latency, 240.0 ms max latency.
757560 records sent, 151512.0 records/sec (147.96 MB/sec), 150.8 ms avg latency, 247.0 ms max latency.
750795 records sent, 150159.0 records/sec (146.64 MB/sec), 100.4 ms avg latency, 202.0 ms max latency.
743505 records sent, 148701.0 records/sec (145.22 MB/sec), 72.8 ms avg latency, 213.0 ms max latency.
769485 records sent, 153897.0 records/sec (150.29 MB/sec), 96.6 ms avg latency, 233.0 ms max latency.
770460 records sent, 154092.0 records/sec (150.48 MB/sec), 36.3 ms avg latency, 93.0 ms max latency.
739845 records sent, 147969.0 records/sec (144.50 MB/sec), 128.3 ms avg latency, 214.0 ms max latency.
732960 records sent, 146592.0 records/sec (143.16 MB/sec), 122.6 ms avg latency, 206.0 ms max latency.
761835 records sent, 152367.0 records/sec (148.80 MB/sec), 55.6 ms avg latency, 128.0 ms max latency.
783810 records sent, 156762.0 records/sec (153.09 MB/sec), 69.3 ms avg latency, 217.0 ms max latency.
726315 records sent, 145263.0 records/sec (141.86 MB/sec), 169.7 ms avg latency, 256.0 ms max latency.
784320 records sent, 156864.0 records/sec (153.19 MB/sec), 32.9 ms avg latency, 146.0 ms max latency.
736785 records sent, 147357.0 records/sec (143.90 MB/sec), 142.7 ms avg latency, 221.0 ms max latency.
745755 records sent, 149151.0 records/sec (145.66 MB/sec), 151.1 ms avg latency, 240.0 ms max latency.
743790 records sent, 148698.5 records/sec (145.21 MB/sec), 138.9 ms avg latency, 231.0 ms max latency.
649740 records sent, 129948.0 records/sec (126.90 MB/sec), 221.2 ms avg latency, 579.0 ms max latency.
745215 records sent, 149043.0 records/sec (145.55 MB/sec), 169.9 ms avg latency, 235.0 ms max latency.
748755 records sent, 149751.0 records/sec (146.24 MB/sec), 108.5 ms avg latency, 204.0 ms max latency.
744915 records sent, 148983.0 records/sec (145.49 MB/sec), 136.5 ms avg latency, 239.0 ms max latency.
763485 records sent, 152697.0 records/sec (149.12 MB/sec), 71.7 ms avg latency, 155.0 ms max latency.
720870 records sent, 144174.0 records/sec (140.79 MB/sec), 143.1 ms avg latency, 257.0 ms max latency.
25000000 records sent, 143016.5 records/sec (139.66 MB/sec), 137.93 ms avg latency, 1413.00 ms max latency, 139 ms 50th, 226 ms 95th, 547 ms 99th, 1359 ms 99.9th.

$ bin/kafka-consumer-perf-test.sh --topic sandbox --messages 25000000 --bootstrap-server localhost:9092 --timeout 500000000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2025-07-02 18:30:33:434, 2025-07-02 18:33:30:045, 24414.0625, 138.2364, 25000000, 141554.0368, 3457, 173154, 140.9962, 144380.1472

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Forest0923 : Thanks for the updated PR and the perf results. The perf numbers look good. A few more comments.

protected final ReentrantLock lock = new ReentrantLock();
// Serializes all index operations that mutate internal state
private final ReentrantLock lock = new ReentrantLock();
// Allows concurrent read operations while ensuring exclusive access during index resizing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

during index resizing => if the underlying mmap is changed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Fixed in 5bc31d2

return LockUtils.inLockThrows(lock, action);
}

protected final <T, E extends Exception> T inResizeReadLock(StorageAction<T, E> action) throws E {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inResizeReadLock => inRemapReadLock ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in d320291

}

protected final <T, E extends Exception> T inResizeReadLock(StorageAction<T, E> action) throws E {
remapLock.readLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem that the caller passes in action that actually throws. Could we just use LockUtils.inLock()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've addressed the remaining comments. 7c98c33
Could you please check it again?

/* Windows or z/OS won't let us modify the file length while the file is mmapped :-( */
if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
return LockUtils.inLockThrows(lock, () -> {
remapLock.writeLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use LockUtils.inLockThrows(remapLock.writeLock().lock(), () -> {?

@@ -288,12 +286,9 @@ public void closeHandler() {
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
lock.lock();
try {
LockUtils.inLockThrows(lock, () -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

safeForceUnmap() also changes mmap. So we want to protect it by the remap lock.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Forest0923 : Thanks for the updated PR. A few more comments.

protected final ReentrantLock lock = new ReentrantLock();
// Serializes all index operations that mutate internal state
private final ReentrantLock lock = new ReentrantLock();
// Allows concurrent read operations while ensuring exclusive access if the underlying file is changed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the underlying file => the underlying mmap

@@ -187,8 +191,7 @@ public void updateParentDir(File parentDir) {
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
*/
public boolean resize(int newSize) throws IOException {
lock.lock();
try {
return LockUtils.inLockThrows(lock, () -> LockUtils.inLockThrows(remapLock.writeLock(), () -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we split this into two lines?

@@ -288,12 +281,9 @@ public void closeHandler() {
// However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness.
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the details.
lock.lock();
try {
LockUtils.inLockThrows(remapLock.writeLock(), () -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need both locks here since we need to prevent both concurrent writer and reader.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ba483b3

@Forest0923 Forest0923 changed the title KAFKA-19390: Call AbstractIndex.safeForceUnmap() in AbstractIndex.resize() on Linux to prevent stale index file memory mappings KAFKA-19390: Call safeForceUnmap() in AbstractIndex.resize() on Linux to prevent stale mmap of index files Jun 27, 2025
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Forest0923 : Thanks for the updated PR. One more comment.

} finally {
lock.unlock();
}
return LockUtils.inLockThrows(lock, () ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the utility method in this class? Ditto in a few other places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ve added inRemapReadWriteLock method. Does this address your comment appropriately?

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Forest0923 : Thanks for the updated PR. A couple of more comments.

@@ -236,12 +235,9 @@ public void renameTo(File f) throws IOException {
* Flush the data in the index to disk
*/
public void flush() {
lock.lock();
try {
LockUtils.inLock(lock, () -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could just be inLock( () -> {, right? Ditto in a few other places.

@@ -187,8 +191,7 @@ public void updateParentDir(File parentDir) {
* @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
*/
public boolean resize(int newSize) throws IOException {
lock.lock();
try {
return inRemapReadWriteLock(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably clearly to do sth like the following. Ditto for closeHandler().

inLockThrows(() ->
      inRemapWriteLockThrows( () -> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestions! I've applied the fix

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Forest0923 : Thanks for the updated PR. One more comment.

return LockUtils.inLockThrows(remapLock.readLock(), () -> action).get();
}

protected final <E extends Exception> void inRemapReadLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inRemapReadLockThrows => inRemapWriteLockThrows

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the lock helpers.

@junrao
Copy link
Contributor

junrao commented Jun 30, 2025

@Forest0923 : Thanks for the updated PR. The following test failure seems related to this PR. My understanding is that this is caused by low retention size that closes the handler of the index file during async delete. However, by default, the delete is delayed by 60 secs. So, I don't quite understand why this failure is showing up in this test. Do you know what's causing this test to fail now?

2025-06-27T23:53:14.7339866Z Gradle Test Run :core:test > Gradle Test Executor 27 > PartitionTest > testPartitionListenerWhenPartitionIsDeleted() STARTED
2025-06-27T23:53:14.8388700Z kafka.log.LogConcurrencyTest.testUncommittedDataNotConsumedFrequentSegmentRolls() failed, log available in /home/runner/work/kafka/kafka/core/build/reports/testOutput/kafka.log.LogConcurrencyTest.testUncommittedDataNotConsumedFrequentSegmentRolls().test.stdout
2025-06-27T23:53:14.8470450Z 
2025-06-27T23:53:14.8471838Z Gradle Test Run :core:test > Gradle Test Executor 29 > LogConcurrencyTest > testUncommittedDataNotConsumedFrequentSegmentRolls() FAILED
2025-06-27T23:53:14.8480019Z     java.util.concurrent.ExecutionException: java.lang.NullPointerException: Cannot invoke "java.nio.MappedByteBuffer.duplicate()" because the return value of "org.apache.kafka.storage.internals.log.OffsetIndex.mmap()" is null
2025-06-27T23:53:14.8482629Z         at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
2025-06-27T23:53:14.8484193Z         at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
2025-06-27T23:53:14.8485686Z         at kafka.log.LogConcurrencyTest.testUncommittedDataNotConsumed(LogConcurrencyTest.scala:78)
2025-06-27T23:53:14.8487462Z         at kafka.log.LogConcurrencyTest.testUncommittedDataNotConsumedFrequentSegmentRolls(LogConcurrencyTest.scala:64)
2025-06-27T23:53:14.8490356Z 
2025-06-27T23:53:14.8490782Z         Caused by:
2025-06-27T23:53:14.8492826Z         java.lang.NullPointerException: Cannot invoke "java.nio.MappedByteBuffer.duplicate()" because the return value of "org.apache.kafka.storage.internals.log.OffsetIndex.mmap()" is null
2025-06-27T23:53:14.8494790Z             at org.apache.kafka.storage.internals.log.OffsetIndex.lambda$lookup$0(OffsetIndex.java:99)
2025-06-27T23:53:14.8496193Z             at org.apache.kafka.storage.internals.log.AbstractIndex.inRemapReadLock(AbstractIndex.java:432)
2025-06-27T23:53:14.8497708Z             at org.apache.kafka.storage.internals.log.OffsetIndex.lookup(OffsetIndex.java:98)
2025-06-27T23:53:14.8499080Z             at org.apache.kafka.storage.internals.log.LogSegment.translateOffset(LogSegment.java:395)
2025-06-27T23:53:14.8500435Z             at org.apache.kafka.storage.internals.log.LogSegment.translateOffset(LogSegment.java:377)
2025-06-27T23:53:14.8501747Z             at org.apache.kafka.storage.internals.log.LogSegment.read(LogSegment.java:435)
2025-06-27T23:53:14.8503390Z             at org.apache.kafka.storage.internals.log.LocalLog.lambda$read$12(LocalLog.java:505)
2025-06-27T23:53:14.8504745Z             at org.apache.kafka.storage.internals.log.LocalLog.maybeHandleIOException(LocalLog.java:816)
2025-06-27T23:53:14.8506085Z             at org.apache.kafka.storage.internals.log.LocalLog.maybeHandleIOException(LocalLog.java:190)
2025-06-27T23:53:14.8507296Z             at org.apache.kafka.storage.internals.log.LocalLog.read(LocalLog.java:467)
2025-06-27T23:53:14.8508404Z             at org.apache.kafka.storage.internals.log.UnifiedLog.read(UnifiedLog.java:1605)
2025-06-27T23:53:14.8509490Z             at kafka.log.LogConcurrencyTest$ConsumerTask.call(LogConcurrencyTest.scala:94)
2025-06-27T23:53:14.8510815Z             at kafka.log.LogConcurrencyTest$ConsumerTask.call(LogConcurrencyTest.scala:88)
2025-06-27T23:53:14.8511923Z             at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
2025-06-27T23:53:14.8557331Z             at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
2025-06-27T23:53:14.8558698Z             at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
2025-06-27T23:53:14.8559767Z             at java.base/java.lang.Thread.run(Thread.java:1575)

@Forest0923
Copy link
Contributor Author

I'm not exactly sure why mmap became null, but perhaps we need to acquire the write lock in append or truncateToEntries()?

I was able to reproduce the issue locally by increasing maxOffset like below:

diff --git a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
index 854be39808..06421fd96a 100644
--- a/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConcurrencyTest.scala
@@ -67,7 +67,7 @@ class LogConcurrencyTest {
   def testUncommittedDataNotConsumed(log: UnifiedLog): Unit = {
     val executor = Executors.newFixedThreadPool(2)
     try {
-      val maxOffset = 5000
+      val maxOffset = 10000
       val consumer = new ConsumerTask(log, maxOffset)
       val appendTask = new LogAppendTask(log, maxOffset)

Applying the following change seems to fix the issue:

diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
index 04d68e9a54..89875484f7 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
@@ -212,7 +212,7 @@ public final class OffsetIndex extends AbstractIndex {
      * Truncates index to a known number of entries.
      */
     private void truncateToEntries(int entries) {
-        inLock(() -> {
+        inRemapWriteLockThrows(() -> {
             super.truncateToEntries0(entries);
             this.lastOffset = lastEntry().offset;
             log.debug("Truncated index {} to {} entries; position is now {} and last offset is now {}",

@junrao
Copy link
Contributor

junrao commented Jul 1, 2025

@Forest0923 : Thanks for the analysis. I was able to reproduce this issue too. It seems that mmap is temporarily set to null by resize() through onBecomeInactiveSegment() when the segment rolls. I am not sure why lookup() will see the null mmap since there is protection through the read/write lock in resize() and lookup(). closeHandler() can permanently set mmap to null. I added some instrumentation. It seems that it's never called when the test fails. So, I am still puzzled on what's causing this.

Also, why would adding inRemapWriteLockThrows in truncateToEntries() fix the issue?

@Forest0923
Copy link
Contributor Author

Forest0923 commented Jul 2, 2025

Sorry, my earlier comment about truncateToEntries() was incorrect—please disregard it.
Also, I found that the lock helper was implemented incorrectly:

The Supplier's actual action was executed outside the lock.

protected final <T> T inRemapReadLock(Supplier<T> action) {
    return LockUtils.inLock(remapLock.readLock(), () -> action).get();
}

updated: #19961 (comment)

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Forest0923 : Thanks for the updated PR. A few more comments. Also, do you know what's causing the following test failure?

2025-07-02T03:49:27.3858133Z 
2025-07-02T03:49:27.3860984Z [Error] /home/runner/work/kafka/kafka/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java:132:  error: no suitable method found for produceKeyValuesSynchronouslyWithTimestamp(String,List
<KeyValue<Integer,String>>,Properties,long)
2025-07-02T03:49:27.3863357Z > Task :streams:integration-tests:compileTestScala FAILED
2025-07-02T03:49:27.3864394Z javac exited with exit code 1

throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has "
+ "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp "
+ timestamp(mmap(), 0));
inLock(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs the remap read lock.

}

/**
* The last entry in the index
*/
private OffsetPosition lastEntry() {
lock.lock();
try {
return inLock(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastEntry() is a reader. So it should only take remap read lock.

Also, there is an existing issue. It seems there is no memory barrier for the instance level field lastOffset. So a reader may not see the latest value. We need to make it volatile.

@@ -259,30 +252,24 @@ private int relativeOffset(ByteBuffer buffer, int n) {
* Read the last entry from the index file. This operation involves disk access.
*/
private TimestampOffset lastEntryFromIndexFile() {
lock.lock();
try {
return inLock(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastEntryFromIndexFile() is a reader. So, it needs the remap read lock.

@Forest0923
Copy link
Contributor Author

Thanks for the review. I've fixed some lock usages and added volatile for lastOffset.

By the way, I couldn’t reproduce the error in EmitOnChangeIntegrationTest locally or find failed github actions. Could it be related to this PR?

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Forest0923 : Thanks for the updated PR. The code LGTM. Could you run the same producer/consumer perf test again?

@Forest0923
Copy link
Contributor Author

Here is the result of the perf test:

Metric Trunk (a3ed705092) PR (0657c7ae40)
Producer throughput
(records/sec)
143,641.8 144,843.6
Producer throughput
(MB/sec)
140.28 141.45
Producer average latency
(ms)
172.89 162.61
Producer max latency
(ms)
1,677.00 1,556.00
Producer 50th percentile latency
(ms)
183 171
Producer 95th percentile latency
(ms)
232 230
Producer 99th percentile latency
(ms)
340 345
Producer 99.9th percentile latency
(ms)
1,300 1,215
Consumer throughput
(messages/sec)
140,731.7 142,076.9
Consumer throughput
(MB/sec)
137.43 138.75
Trunk (a3ed705)
$ bin/kafka-producer-perf-test.sh --topic sandbox --num-records 25000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092
116191 records sent, 23224.3 records/sec (22.68 MB/sec), 1054.3 ms avg latency, 1677.0 ms max latency.
521820 records sent, 104364.0 records/sec (101.92 MB/sec), 308.6 ms avg latency, 861.0 ms max latency.
714510 records sent, 142902.0 records/sec (139.55 MB/sec), 187.4 ms avg latency, 264.0 ms max latency.
681195 records sent, 136239.0 records/sec (133.05 MB/sec), 196.0 ms avg latency, 254.0 ms max latency.
688440 records sent, 137688.0 records/sec (134.46 MB/sec), 212.5 ms avg latency, 257.0 ms max latency.
773280 records sent, 154656.0 records/sec (151.03 MB/sec), 113.5 ms avg latency, 246.0 ms max latency.
735525 records sent, 147105.0 records/sec (143.66 MB/sec), 130.4 ms avg latency, 234.0 ms max latency.
749115 records sent, 149823.0 records/sec (146.31 MB/sec), 186.0 ms avg latency, 219.0 ms max latency.
758745 records sent, 151749.0 records/sec (148.19 MB/sec), 166.6 ms avg latency, 230.0 ms max latency.
753510 records sent, 150702.0 records/sec (147.17 MB/sec), 94.6 ms avg latency, 210.0 ms max latency.
719400 records sent, 143880.0 records/sec (140.51 MB/sec), 209.1 ms avg latency, 252.0 ms max latency.
748860 records sent, 149772.0 records/sec (146.26 MB/sec), 171.2 ms avg latency, 249.0 ms max latency.
705780 records sent, 141156.0 records/sec (137.85 MB/sec), 192.1 ms avg latency, 249.0 ms max latency.
776445 records sent, 155289.0 records/sec (151.65 MB/sec), 167.2 ms avg latency, 230.0 ms max latency.
789105 records sent, 157821.0 records/sec (154.12 MB/sec), 107.0 ms avg latency, 173.0 ms max latency.
737700 records sent, 147540.0 records/sec (144.08 MB/sec), 183.8 ms avg latency, 252.0 ms max latency.
729885 records sent, 145977.0 records/sec (142.56 MB/sec), 187.1 ms avg latency, 247.0 ms max latency.
730365 records sent, 146073.0 records/sec (142.65 MB/sec), 201.8 ms avg latency, 238.0 ms max latency.
728310 records sent, 145662.0 records/sec (142.25 MB/sec), 191.7 ms avg latency, 233.0 ms max latency.
752880 records sent, 150576.0 records/sec (147.05 MB/sec), 169.9 ms avg latency, 269.0 ms max latency.
766515 records sent, 153303.0 records/sec (149.71 MB/sec), 177.9 ms avg latency, 223.0 ms max latency.
746940 records sent, 149388.0 records/sec (145.89 MB/sec), 145.4 ms avg latency, 225.0 ms max latency.
754905 records sent, 150981.0 records/sec (147.44 MB/sec), 166.0 ms avg latency, 241.0 ms max latency.
740400 records sent, 148080.0 records/sec (144.61 MB/sec), 187.5 ms avg latency, 245.0 ms max latency.
757800 records sent, 151560.0 records/sec (148.01 MB/sec), 172.3 ms avg latency, 246.0 ms max latency.
758145 records sent, 151629.0 records/sec (148.08 MB/sec), 165.8 ms avg latency, 255.0 ms max latency.
738630 records sent, 147726.0 records/sec (144.26 MB/sec), 77.3 ms avg latency, 189.0 ms max latency.
762570 records sent, 152514.0 records/sec (148.94 MB/sec), 168.8 ms avg latency, 261.0 ms max latency.
773985 records sent, 154797.0 records/sec (151.17 MB/sec), 139.5 ms avg latency, 227.0 ms max latency.
713175 records sent, 142635.0 records/sec (139.29 MB/sec), 177.4 ms avg latency, 261.0 ms max latency.
750780 records sent, 150156.0 records/sec (146.64 MB/sec), 162.7 ms avg latency, 241.0 ms max latency.
751770 records sent, 150354.0 records/sec (146.83 MB/sec), 106.9 ms avg latency, 242.0 ms max latency.
745020 records sent, 149004.0 records/sec (145.51 MB/sec), 196.9 ms avg latency, 250.0 ms max latency.
723870 records sent, 144774.0 records/sec (141.38 MB/sec), 208.1 ms avg latency, 249.0 ms max latency.
25000000 records sent, 143641.8 records/sec (140.28 MB/sec), 172.89 ms avg latency, 1677.00 ms max latency, 183 ms 50th, 232 ms 95th, 340 ms 99th, 1300 ms 99.9th.


$ bin/kafka-consumer-perf-test.sh --topic sandbox --messages 25000000 --bootstrap-server localhost:9092 --timeout 500000000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2025-07-08 07:19:04:323, 2025-07-08 07:22:01:966, 24414.0625, 137.4333, 25000000, 140731.6922, 3427, 174216, 140.1367, 143500.0230
PR (0657c7a)
$ bin/kafka-producer-perf-test.sh --topic sandbox --num-records 25000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092
116866 records sent, 23373.2 records/sec (22.83 MB/sec), 1072.6 ms avg latency, 1556.0 ms max latency.
488235 records sent, 97647.0 records/sec (95.36 MB/sec), 328.8 ms avg latency, 791.0 ms max latency.
688785 records sent, 137757.0 records/sec (134.53 MB/sec), 219.5 ms avg latency, 302.0 ms max latency.
743235 records sent, 148647.0 records/sec (145.16 MB/sec), 179.4 ms avg latency, 263.0 ms max latency.
743430 records sent, 148686.0 records/sec (145.20 MB/sec), 204.0 ms avg latency, 228.0 ms max latency.
768465 records sent, 153693.0 records/sec (150.09 MB/sec), 157.6 ms avg latency, 213.0 ms max latency.
722580 records sent, 144516.0 records/sec (141.13 MB/sec), 192.8 ms avg latency, 284.0 ms max latency.
741660 records sent, 148332.0 records/sec (144.86 MB/sec), 178.0 ms avg latency, 248.0 ms max latency.
725550 records sent, 145110.0 records/sec (141.71 MB/sec), 188.2 ms avg latency, 243.0 ms max latency.
744300 records sent, 148860.0 records/sec (145.37 MB/sec), 134.0 ms avg latency, 224.0 ms max latency.
803625 records sent, 160725.0 records/sec (156.96 MB/sec), 22.9 ms avg latency, 164.0 ms max latency.
771810 records sent, 154362.0 records/sec (150.74 MB/sec), 61.7 ms avg latency, 168.0 ms max latency.
743160 records sent, 148632.0 records/sec (145.15 MB/sec), 160.6 ms avg latency, 235.0 ms max latency.
760860 records sent, 152172.0 records/sec (148.61 MB/sec), 174.4 ms avg latency, 254.0 ms max latency.
756060 records sent, 151212.0 records/sec (147.67 MB/sec), 101.9 ms avg latency, 212.0 ms max latency.
731730 records sent, 146346.0 records/sec (142.92 MB/sec), 178.6 ms avg latency, 225.0 ms max latency.
748035 records sent, 149607.0 records/sec (146.10 MB/sec), 174.3 ms avg latency, 246.0 ms max latency.
775995 records sent, 155199.0 records/sec (151.56 MB/sec), 126.1 ms avg latency, 211.0 ms max latency.
744525 records sent, 148905.0 records/sec (145.42 MB/sec), 172.6 ms avg latency, 214.0 ms max latency.
736050 records sent, 147210.0 records/sec (143.76 MB/sec), 110.8 ms avg latency, 223.0 ms max latency.
780660 records sent, 156132.0 records/sec (152.47 MB/sec), 148.7 ms avg latency, 202.0 ms max latency.
739290 records sent, 147858.0 records/sec (144.39 MB/sec), 159.6 ms avg latency, 242.0 ms max latency.
766755 records sent, 153351.0 records/sec (149.76 MB/sec), 181.4 ms avg latency, 226.0 ms max latency.
757380 records sent, 151476.0 records/sec (147.93 MB/sec), 177.5 ms avg latency, 227.0 ms max latency.
735285 records sent, 147057.0 records/sec (143.61 MB/sec), 194.1 ms avg latency, 248.0 ms max latency.
751005 records sent, 150201.0 records/sec (146.68 MB/sec), 120.3 ms avg latency, 227.0 ms max latency.
734895 records sent, 146979.0 records/sec (143.53 MB/sec), 199.7 ms avg latency, 266.0 ms max latency.
744480 records sent, 148896.0 records/sec (145.41 MB/sec), 152.3 ms avg latency, 237.0 ms max latency.
760110 records sent, 152022.0 records/sec (148.46 MB/sec), 76.6 ms avg latency, 211.0 ms max latency.
768435 records sent, 153687.0 records/sec (150.08 MB/sec), 177.6 ms avg latency, 238.0 ms max latency.
757635 records sent, 151527.0 records/sec (147.98 MB/sec), 162.1 ms avg latency, 223.0 ms max latency.
750330 records sent, 150066.0 records/sec (146.55 MB/sec), 140.7 ms avg latency, 217.0 ms max latency.
783540 records sent, 156708.0 records/sec (153.04 MB/sec), 139.9 ms avg latency, 215.0 ms max latency.
732015 records sent, 146403.0 records/sec (142.97 MB/sec), 194.6 ms avg latency, 271.0 ms max latency.
25000000 records sent, 144843.6 records/sec (141.45 MB/sec), 162.61 ms avg latency, 1556.00 ms max latency, 171 ms 50th, 230 ms 95th, 345 ms 99th, 1215 ms 99.9th.


$ bin/kafka-consumer-perf-test.sh --topic sandbox --messages 25000000 --bootstrap-server localhost:9092 --timeout 500000000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2025-07-08 07:00:20:093, 2025-07-08 07:03:16:054, 24414.0625, 138.7470, 25000000, 142076.9375, 3423, 172538, 141.4996, 144895.6172

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Forest0923 : Thanks for the the perf results. LGTM

@junrao junrao merged commit ea7b145 into apache:trunk Jul 8, 2025
23 checks passed
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Forest0923 thanks for this patch, and sorry for delayed review. two small comments remain. Please take a look.

* @param runnable the runnable to be executed within the lock context
* @throws NullPointerException if either {@code lock} or {@code runnable} is null
*/
public static void inLock(Lock lock, Runnable runnable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse me, have we considered replacing inLock by inLockThrows? Keeping both varieties seems a bit verbose to me.

} finally {
lock.unlock();
}
inLockThrows(() ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inLockThrows(() -> inRemapWriteLockThrows(this::safeForceUnmap));

@@ -47,7 +48,10 @@ private enum SearchResultType {

private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class);

protected final ReentrantLock lock = new ReentrantLock();
// Serializes all index operations that mutate internal state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor thing. It would be useful to add a comment on why the reader doesn't need to take this lock.

@Forest0923
Copy link
Contributor Author

@junrao @chia7712 I've created a follow up PR. Could you pleas take a look when you have a moment?
#20131

junrao pushed a commit that referenced this pull request Jul 15, 2025
) (#20131)

This PR performs a refactoring of LockUtils and improves inline
comments, as a follow-up to #19961.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
dalaoqi pushed a commit to dalaoqi/kafka that referenced this pull request Jul 22, 2025
…390) (apache#20131)

This PR performs a refactoring of LockUtils and improves inline
comments, as a follow-up to apache#19961.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker storage Pull requests that target the storage module
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants