Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -758,15 +758,18 @@ public boolean containClusterName(String clusterName) {

@Override
public int getMinPipelineExecutorSize() {
ConnectContext context = ConnectContext.get();
if (context == null) {
return 1;
}
String clusterName = "";
try {
clusterName = ConnectContext.get().getCloudCluster(false);
clusterName = context.getCloudCluster(false);
} catch (ComputeGroupException e) {
LOG.warn("failed to get cluster name", e);
return 1;
}
if (ConnectContext.get() != null
&& Strings.isNullOrEmpty(clusterName)) {
if (Strings.isNullOrEmpty(clusterName)) {
return 1;
}
List<Backend> currentBackends = getBackendsByClusterName(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,45 +215,64 @@ public void startSplit(int numBackends) {
Executor scheduleExecutor = Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
AtomicInteger numFinishedPartitions = new AtomicInteger(0);
// Capture ConnectContext from the current thread to pass to async threads
ConnectContext parentConnectContext = ConnectContext.get();
CompletableFuture.runAsync(() -> {
for (HivePartition partition : prunedPartitions) {
if (batchException.get() != null || splitAssignment.isStop()) {
break;
}
try {
splittersOnFlight.acquire();
CompletableFuture.runAsync(() -> {
try {
List<Split> allFiles = Lists.newArrayList();
getFileSplitByPartitions(
cache, Collections.singletonList(partition), allFiles, bindBrokerName,
numBackends, true);
if (allFiles.size() > numSplitsPerPartition.get()) {
numSplitsPerPartition.set(allFiles.size());
}
if (splitAssignment.needMoreSplit()) {
splitAssignment.addToQueue(allFiles);
}
} catch (Exception e) {
batchException.set(new UserException(e.getMessage(), e));
} finally {
splittersOnFlight.release();
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
// Set ConnectContext for the outer async thread to avoid NPE when accessing session variables.
// Must be cleaned up in finally block to prevent ThreadLocal leaks in pooled threads.
if (parentConnectContext != null) {
parentConnectContext.setThreadLocalInfo();
}
try {
for (HivePartition partition : prunedPartitions) {
if (batchException.get() != null || splitAssignment.isStop()) {
break;
}
try {
splittersOnFlight.acquire();
CompletableFuture.runAsync(() -> {
// Set ConnectContext for the inner async thread
if (parentConnectContext != null) {
parentConnectContext.setThreadLocalInfo();
}
if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) {
splitAssignment.finishSchedule();
try {
List<Split> allFiles = Lists.newArrayList();
getFileSplitByPartitions(
cache, Collections.singletonList(partition), allFiles, bindBrokerName,
numBackends, true);
if (allFiles.size() > numSplitsPerPartition.get()) {
numSplitsPerPartition.set(allFiles.size());
}
if (splitAssignment.needMoreSplit()) {
splitAssignment.addToQueue(allFiles);
}
} catch (Exception e) {
batchException.set(new UserException(e.getMessage(), e));
} finally {
// Clean up ThreadLocal to prevent leaks in pooled threads
ConnectContext.remove();
splittersOnFlight.release();
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) {
splitAssignment.finishSchedule();
}
}
}
}, scheduleExecutor);
} catch (Exception e) {
// When submitting a task, an exception will be thrown if the task pool(scheduleExecutor) is full
batchException.set(new UserException(e.getMessage(), e));
break;
}, scheduleExecutor);
} catch (Exception e) {
// When submitting a task, an exception will be thrown if the task pool(scheduleExecutor)
// is full
batchException.set(new UserException(e.getMessage(), e));
break;
}
}
}
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
} finally {
// Clean up ThreadLocal to prevent leaks in pooled threads
ConnectContext.remove();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,23 @@ public void testGetMinPipelineExecutorSizeWithNoClusterInContext() {
}
}

// Test for error handling when ConnectContext.get() returns null (no ConnectContext at all)
// This tests the fix for NPE in async threads where ConnectContext is not propagated
@Test
public void testGetMinPipelineExecutorSizeWithNullConnectContext() {
infoService = new CloudSystemInfoService();

// Ensure ConnectContext is null (not set in ThreadLocal)
ConnectContext.remove();

// Verify ConnectContext.get() returns null
Assert.assertNull(ConnectContext.get());

// Should return 1 when ConnectContext.get() returns null, instead of throwing NPE
int result = infoService.getMinPipelineExecutorSize();
Assert.assertEquals(1, result);
}

@Test
public void testGetMinPipelineExecutorSizeWithMixedValidInvalidBackends() {
infoService = new CloudSystemInfoService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import org.junit.Assert;
Expand All @@ -32,6 +33,12 @@
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class HiveScanNodeTest {
private static final long MB = 1024L * 1024L;
Expand Down Expand Up @@ -85,4 +92,172 @@ public void testDetermineTargetFileSplitSizeKeepsInitialSize() throws Exception
long target = (long) method.invoke(node, caches, false);
Assert.assertEquals(32 * MB, target);
}

/**
* Test that ConnectContext is properly propagated to async threads and cleaned up afterward.
* This tests the fix for the NPE issue where async threads in HiveScanNode.startSplit()
* couldn't access ConnectContext because it's stored in ThreadLocal.
*/
@Test
public void testConnectContextPropagationInAsyncThread() throws Exception {
// Create a mock ConnectContext
ConnectContext parentContext = Mockito.mock(ConnectContext.class);

// Set it in the current thread
parentContext.setThreadLocalInfo();

ExecutorService executor = Executors.newSingleThreadExecutor();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<ConnectContext> asyncContextRef = new AtomicReference<>();
AtomicReference<ConnectContext> afterCleanupRef = new AtomicReference<>();

try {
// Simulate the pattern used in HiveScanNode.startSplit()
CompletableFuture.runAsync(() -> {
// Propagate ConnectContext to async thread
if (parentContext != null) {
parentContext.setThreadLocalInfo();
}
try {
// Capture the context in async thread
asyncContextRef.set(ConnectContext.get());
} finally {
// Clean up ThreadLocal to prevent leaks (as we do in the fix)
ConnectContext.remove();
afterCleanupRef.set(ConnectContext.get());
latch.countDown();
}
}, executor).get(5, TimeUnit.SECONDS);

// Wait for async task to complete
Assert.assertTrue("Async task should complete", latch.await(5, TimeUnit.SECONDS));

// Verify ConnectContext was available in async thread
Assert.assertNotNull("ConnectContext should be available in async thread", asyncContextRef.get());

// Verify ConnectContext was cleaned up after task completion
Assert.assertNull("ConnectContext should be cleaned up after task", afterCleanupRef.get());

} finally {
executor.shutdown();
ConnectContext.remove();
}
}

/**
* Test that nested async threads also properly propagate and clean up ConnectContext.
* This simulates the nested CompletableFuture.runAsync() pattern in HiveScanNode.startSplit().
*/
@Test
public void testNestedAsyncConnectContextPropagation() throws Exception {
// Create a mock ConnectContext
ConnectContext parentContext = Mockito.mock(ConnectContext.class);

ExecutorService executor = Executors.newFixedThreadPool(2);
CountDownLatch outerLatch = new CountDownLatch(1);
CountDownLatch innerLatch = new CountDownLatch(1);
AtomicReference<ConnectContext> outerContextRef = new AtomicReference<>();
AtomicReference<ConnectContext> innerContextRef = new AtomicReference<>();
AtomicReference<ConnectContext> outerAfterCleanupRef = new AtomicReference<>();
AtomicReference<ConnectContext> innerAfterCleanupRef = new AtomicReference<>();

try {
// Simulate the nested async pattern in HiveScanNode.startSplit()
CompletableFuture.runAsync(() -> {
// Outer async thread: propagate ConnectContext
if (parentContext != null) {
parentContext.setThreadLocalInfo();
}
try {
outerContextRef.set(ConnectContext.get());

// Inner async thread (like the partition processing in startSplit)
CompletableFuture.runAsync(() -> {
// Inner async thread: propagate ConnectContext
if (parentContext != null) {
parentContext.setThreadLocalInfo();
}
try {
innerContextRef.set(ConnectContext.get());
} finally {
// Clean up inner thread's ThreadLocal
ConnectContext.remove();
innerAfterCleanupRef.set(ConnectContext.get());
innerLatch.countDown();
}
}, executor).join();

} finally {
// Clean up outer thread's ThreadLocal
ConnectContext.remove();
outerAfterCleanupRef.set(ConnectContext.get());
outerLatch.countDown();
}
}, executor).get(5, TimeUnit.SECONDS);

// Wait for all tasks to complete
Assert.assertTrue("Outer task should complete", outerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue("Inner task should complete", innerLatch.await(5, TimeUnit.SECONDS));

// Verify ConnectContext was available in both async threads
Assert.assertNotNull("ConnectContext should be available in outer async thread",
outerContextRef.get());
Assert.assertNotNull("ConnectContext should be available in inner async thread",
innerContextRef.get());

// Verify both threads cleaned up their ThreadLocal
Assert.assertNull("Outer thread should clean up ConnectContext", outerAfterCleanupRef.get());
Assert.assertNull("Inner thread should clean up ConnectContext", innerAfterCleanupRef.get());

} finally {
executor.shutdown();
ConnectContext.remove();
}
}

/**
* Test that ThreadLocal cleanup works correctly even when exception occurs.
*/
@Test
public void testConnectContextCleanupOnException() throws Exception {
ConnectContext parentContext = Mockito.mock(ConnectContext.class);

ExecutorService executor = Executors.newSingleThreadExecutor();
AtomicReference<ConnectContext> afterExceptionRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);

try {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
if (parentContext != null) {
parentContext.setThreadLocalInfo();
}
try {
// Simulate an exception during processing
throw new RuntimeException("Simulated exception");
} finally {
// ThreadLocal should still be cleaned up even on exception
ConnectContext.remove();
afterExceptionRef.set(ConnectContext.get());
latch.countDown();
}
}, executor);

// The future should complete exceptionally
try {
future.get(5, TimeUnit.SECONDS);
Assert.fail("Should have thrown an exception");
} catch (Exception e) {
// Expected
}

Assert.assertTrue("Task should complete", latch.await(5, TimeUnit.SECONDS));

// Verify ThreadLocal was cleaned up even after exception
Assert.assertNull("ConnectContext should be cleaned up after exception", afterExceptionRef.get());

} finally {
executor.shutdown();
ConnectContext.remove();
}
}
}