-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Closed
Description
Affected Version
All long-running (at least ~80days) Druid clusters that have parallel merging on Broker enabled and are running JDK 17.
Minor versions with the patch: https://bugs.openjdk.org/browse/JDK-8351933
- JDK 17.0.17
- JDK 21.0.8
Description
Broker ForkJoinPool affected by DK-8330017. The ctl field's RC (Release Count) is a 16-bit signed value tracking released workers. A masking bug causes RC to decrement on every worker release/re-acquire cycle.
Root Cause:
- RC starts near 0
- Decrements over ~80 days of continuous operation
- Reaches -32768, then overflows to +32767 (MAX_CAP)
- Since RC = MAX_CAP, the pool believes all workers are available but refuses to schedule new tasks. Workers sit idle in
awaitWork()forever. This causes all aggregation queries on the broker to stall, likeselect count(*) from A
Reproducing test:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.concurrent;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryTimeoutException;
import org.junit.Assume;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Test that demonstrates JDK-8330017: ForkJoinPool stops executing tasks due to
* ctl field Release Count (RC) overflow.
*
* Bug details: https://bugs.openjdk.org/browse/JDK-8330017
*
* The FJP internal ctl field has an RC (Release Count) that tracks worker thread state.
* Due to a bug in JDK 11/17:
* - RC decrements over time as threads are released/re-acquired
* - After ~80 days of continuous operation, RC can reach -32768 (minimum 16-bit value)
* - On next decrement, it overflows to +32767 (MAX_CAP)
* - Pool then thinks it has max threads and stops scheduling tasks
* - Workers sit idle at awaitWork() forever, never picking up new tasks
*
*/
public class ForkJoinPoolRCOverflowTest
{
private static final Logger LOG = new Logger(ForkJoinPoolRCOverflowTest.class);
// ForkJoinPool ctl field bit layout (JDK 11/17):
// Bits 48-63: RC (Release Count) - released workers minus target parallelism
private static final int RC_SHIFT = 48;
private static final long RC_MASK = 0xffffL << RC_SHIFT;
private static final int MAX_CAP = 0x7fff; // 32767
@Test
public void testRCOverflowWithParallelMergeCombiningSequence() throws Exception
{
ForkJoinPool pool = new ForkJoinPool(
4,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
(t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t),
true
);
Field ctlField;
try {
ctlField = ForkJoinPool.class.getDeclaredField("ctl");
ctlField.setAccessible(true);
}
catch (Exception e) {
LOG.warn("Cannot access ForkJoinPool.ctl field, skipping test: %s", e.getMessage());
pool.shutdown();
Assume.assumeTrue("Cannot access ForkJoinPool.ctl field via reflection", false);
return;
}
try {
// First verify the pool works with ParallelMergeCombiningSequence
List<Sequence<IntPair>> normalInput = new ArrayList<>();
normalInput.add(generateSequence(100));
normalInput.add(generateSequence(100));
normalInput.add(generateSequence(100));
normalInput.add(generateSequence(100));
ParallelMergeCombiningSequence<IntPair> normalSequence = new ParallelMergeCombiningSequence<>(
pool,
normalInput,
INT_PAIR_ORDERING,
INT_PAIR_MERGE_FN,
true,
5000,
0,
4,
128,
64,
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS,
null
);
// Verify normal operation works
Yielder<IntPair> normalYielder = Yielders.each(normalSequence);
int normalCount = 0;
while (!normalYielder.isDone()) {
normalYielder = normalYielder.next(normalYielder.get());
normalCount++;
}
normalYielder.close();
LOG.info("Normal ParallelMergeCombiningSequence processed %d items", normalCount);
assertTrue("Should process items normally", normalCount > 0);
// Wait for pool to be quiescent
assertTrue("Pool should be quiescent", pool.awaitQuiescence(5, TimeUnit.SECONDS));
// Simulate the RC overflow by manipulating the ctl field
long currentCtl = (long) ctlField.get(pool);
long preOverflowRC = extractRC(currentCtl);
LOG.info("Pre-overflow ctl: 0x%016X, RC=%d", currentCtl, preOverflowRC);
// Set RC to MAX_CAP to simulate the overflow condition
long overflowCtl = setRC(currentCtl, MAX_CAP);
ctlField.set(pool, overflowCtl);
// Verify the ctl field was actually modified
long verifyCtl = (long) ctlField.get(pool);
long postOverflowRC = extractRC(verifyCtl);
LOG.info("Post-overflow ctl: 0x%016X, RC=%d", verifyCtl, postOverflowRC);
assertEquals("RC should be set to MAX_CAP (32767)", MAX_CAP, postOverflowRC);
// Now try to use ParallelMergeCombiningSequence with the corrupted pool
List<Sequence<IntPair>> overflowInput = new ArrayList<>();
overflowInput.add(generateSequence(1000));
overflowInput.add(generateSequence(1000));
overflowInput.add(generateSequence(1000));
overflowInput.add(generateSequence(1000));
// Use a short timeout to detect the hang
ParallelMergeCombiningSequence<IntPair> overflowSequence = new ParallelMergeCombiningSequence<>(
pool,
overflowInput,
INT_PAIR_ORDERING,
INT_PAIR_MERGE_FN,
true,
2000, // 2 second timeout - should hit this if bug is present
0,
4,
128,
64,
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS,
null
);
// With RC at MAX_CAP, the pool should fail to schedule tasks and timeout
try {
Yielder<IntPair> overflowYielder = Yielders.each(overflowSequence);
int overflowCount = 0;
while (!overflowYielder.isDone()) {
overflowYielder = overflowYielder.next(overflowYielder.get());
overflowCount++;
}
overflowYielder.close();
// If we get here, the pool somehow recovered - this can happen if the JDK
// implementation handles the corrupted state gracefully
LOG.info("Overflow sequence processed %d items (pool recovered from corrupted state)", overflowCount);
fail("Expected QueryTimeoutException due to RC overflow, but pool recovered and processed "
+ overflowCount + " items");
}
catch (QueryTimeoutException e) {
// This is the expected behavior when the bug is active:
// Pool thinks it has MAX_CAP workers available but won't schedule new tasks
LOG.info("ParallelMergeCombiningSequence timed out as expected with RC overflow: %s", e.getMessage());
assertTrue("Timeout message should mention timeout",
e.getMessage().contains("timeout") || e.getMessage().contains("complete"));
}
}
finally {
pool.shutdownNow();
pool.awaitTermination(5, TimeUnit.SECONDS);
}
}
private static final Ordering<IntPair> INT_PAIR_ORDERING = Ordering.natural().onResultOf(p -> p.lhs);
private static final BinaryOperator<IntPair> INT_PAIR_MERGE_FN = (lhs, rhs) -> {
if (lhs == null) {
return rhs;
}
if (rhs == null) {
return lhs;
}
return new IntPair(lhs.lhs, lhs.rhs + rhs.rhs);
};
private static class IntPair extends Pair<Integer, Integer>
{
IntPair(Integer lhs, Integer rhs)
{
super(lhs, rhs);
}
}
private static Sequence<IntPair> generateSequence(int size)
{
return new BaseSequence<>(
new BaseSequence.IteratorMaker<IntPair, Iterator<IntPair>>()
{
@Override
public Iterator<IntPair> make()
{
return new Iterator<IntPair>()
{
int mergeKey = 0;
int rowCounter = 0;
@Override
public boolean hasNext()
{
return rowCounter < size;
}
@Override
public IntPair next()
{
rowCounter++;
mergeKey += ThreadLocalRandom.current().nextInt(1, 3);
return new IntPair(mergeKey, ThreadLocalRandom.current().nextInt(1, 100));
}
};
}
@Override
public void cleanup(Iterator<IntPair> iterFromMake)
{
// nothing to cleanup
}
}
);
}
private static long extractRC(long ctl)
{
return (short) (ctl >> RC_SHIFT);
}
private static long setRC(long ctl, int rc)
{
return (ctl & ~RC_MASK) | (((long) rc & 0xffffL) << RC_SHIFT);
}
}Reactions are currently unavailable