From eb422c5b50f1c7c9a28cf833832514c8d160ab03 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Tue, 10 Feb 2026 16:46:56 +0800 Subject: [PATCH] [Fix](Hive)Hive queries can potentially hit an NPE when running in the batch model. --- .../cloud/system/CloudSystemInfoService.java | 9 +- .../datasource/hive/source/HiveScanNode.java | 89 +++++---- .../system/CloudSystemInfoServiceTest.java | 17 ++ .../hive/source/HiveScanNodeTest.java | 175 ++++++++++++++++++ 4 files changed, 252 insertions(+), 38 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java index 24a09364ad1040..26cb383eb5fcd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java @@ -758,15 +758,18 @@ public boolean containClusterName(String clusterName) { @Override public int getMinPipelineExecutorSize() { + ConnectContext context = ConnectContext.get(); + if (context == null) { + return 1; + } String clusterName = ""; try { - clusterName = ConnectContext.get().getCloudCluster(false); + clusterName = context.getCloudCluster(false); } catch (ComputeGroupException e) { LOG.warn("failed to get cluster name", e); return 1; } - if (ConnectContext.get() != null - && Strings.isNullOrEmpty(clusterName)) { + if (Strings.isNullOrEmpty(clusterName)) { return 1; } List currentBackends = getBackendsByClusterName(clusterName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 2e8f3735f6b248..3b80ec7c4c6f0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -215,45 +215,64 @@ public void startSplit(int numBackends) { Executor scheduleExecutor = Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor(); String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); AtomicInteger numFinishedPartitions = new AtomicInteger(0); + // Capture ConnectContext from the current thread to pass to async threads + ConnectContext parentConnectContext = ConnectContext.get(); CompletableFuture.runAsync(() -> { - for (HivePartition partition : prunedPartitions) { - if (batchException.get() != null || splitAssignment.isStop()) { - break; - } - try { - splittersOnFlight.acquire(); - CompletableFuture.runAsync(() -> { - try { - List allFiles = Lists.newArrayList(); - getFileSplitByPartitions( - cache, Collections.singletonList(partition), allFiles, bindBrokerName, - numBackends, true); - if (allFiles.size() > numSplitsPerPartition.get()) { - numSplitsPerPartition.set(allFiles.size()); - } - if (splitAssignment.needMoreSplit()) { - splitAssignment.addToQueue(allFiles); - } - } catch (Exception e) { - batchException.set(new UserException(e.getMessage(), e)); - } finally { - splittersOnFlight.release(); - if (batchException.get() != null) { - splitAssignment.setException(batchException.get()); + // Set ConnectContext for the outer async thread to avoid NPE when accessing session variables. + // Must be cleaned up in finally block to prevent ThreadLocal leaks in pooled threads. + if (parentConnectContext != null) { + parentConnectContext.setThreadLocalInfo(); + } + try { + for (HivePartition partition : prunedPartitions) { + if (batchException.get() != null || splitAssignment.isStop()) { + break; + } + try { + splittersOnFlight.acquire(); + CompletableFuture.runAsync(() -> { + // Set ConnectContext for the inner async thread + if (parentConnectContext != null) { + parentConnectContext.setThreadLocalInfo(); } - if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) { - splitAssignment.finishSchedule(); + try { + List allFiles = Lists.newArrayList(); + getFileSplitByPartitions( + cache, Collections.singletonList(partition), allFiles, bindBrokerName, + numBackends, true); + if (allFiles.size() > numSplitsPerPartition.get()) { + numSplitsPerPartition.set(allFiles.size()); + } + if (splitAssignment.needMoreSplit()) { + splitAssignment.addToQueue(allFiles); + } + } catch (Exception e) { + batchException.set(new UserException(e.getMessage(), e)); + } finally { + // Clean up ThreadLocal to prevent leaks in pooled threads + ConnectContext.remove(); + splittersOnFlight.release(); + if (batchException.get() != null) { + splitAssignment.setException(batchException.get()); + } + if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) { + splitAssignment.finishSchedule(); + } } - } - }, scheduleExecutor); - } catch (Exception e) { - // When submitting a task, an exception will be thrown if the task pool(scheduleExecutor) is full - batchException.set(new UserException(e.getMessage(), e)); - break; + }, scheduleExecutor); + } catch (Exception e) { + // When submitting a task, an exception will be thrown if the task pool(scheduleExecutor) + // is full + batchException.set(new UserException(e.getMessage(), e)); + break; + } } - } - if (batchException.get() != null) { - splitAssignment.setException(batchException.get()); + if (batchException.get() != null) { + splitAssignment.setException(batchException.get()); + } + } finally { + // Clean up ThreadLocal to prevent leaks in pooled threads + ConnectContext.remove(); } }); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java index 85d14585f21589..702e38d3c97867 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java @@ -572,6 +572,23 @@ public void testGetMinPipelineExecutorSizeWithNoClusterInContext() { } } + // Test for error handling when ConnectContext.get() returns null (no ConnectContext at all) + // This tests the fix for NPE in async threads where ConnectContext is not propagated + @Test + public void testGetMinPipelineExecutorSizeWithNullConnectContext() { + infoService = new CloudSystemInfoService(); + + // Ensure ConnectContext is null (not set in ThreadLocal) + ConnectContext.remove(); + + // Verify ConnectContext.get() returns null + Assert.assertNull(ConnectContext.get()); + + // Should return 1 when ConnectContext.get() returns null, instead of throwing NPE + int result = infoService.getMinPipelineExecutorSize(); + Assert.assertEquals(1, result); + } + @Test public void testGetMinPipelineExecutorSizeWithMixedValidInvalidBackends() { infoService = new CloudSystemInfoService(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java index 727ff9390032ab..bc028c248cc13a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java @@ -23,6 +23,7 @@ import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; import org.junit.Assert; @@ -32,6 +33,12 @@ import java.lang.reflect.Method; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; public class HiveScanNodeTest { private static final long MB = 1024L * 1024L; @@ -85,4 +92,172 @@ public void testDetermineTargetFileSplitSizeKeepsInitialSize() throws Exception long target = (long) method.invoke(node, caches, false); Assert.assertEquals(32 * MB, target); } + + /** + * Test that ConnectContext is properly propagated to async threads and cleaned up afterward. + * This tests the fix for the NPE issue where async threads in HiveScanNode.startSplit() + * couldn't access ConnectContext because it's stored in ThreadLocal. + */ + @Test + public void testConnectContextPropagationInAsyncThread() throws Exception { + // Create a mock ConnectContext + ConnectContext parentContext = Mockito.mock(ConnectContext.class); + + // Set it in the current thread + parentContext.setThreadLocalInfo(); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference asyncContextRef = new AtomicReference<>(); + AtomicReference afterCleanupRef = new AtomicReference<>(); + + try { + // Simulate the pattern used in HiveScanNode.startSplit() + CompletableFuture.runAsync(() -> { + // Propagate ConnectContext to async thread + if (parentContext != null) { + parentContext.setThreadLocalInfo(); + } + try { + // Capture the context in async thread + asyncContextRef.set(ConnectContext.get()); + } finally { + // Clean up ThreadLocal to prevent leaks (as we do in the fix) + ConnectContext.remove(); + afterCleanupRef.set(ConnectContext.get()); + latch.countDown(); + } + }, executor).get(5, TimeUnit.SECONDS); + + // Wait for async task to complete + Assert.assertTrue("Async task should complete", latch.await(5, TimeUnit.SECONDS)); + + // Verify ConnectContext was available in async thread + Assert.assertNotNull("ConnectContext should be available in async thread", asyncContextRef.get()); + + // Verify ConnectContext was cleaned up after task completion + Assert.assertNull("ConnectContext should be cleaned up after task", afterCleanupRef.get()); + + } finally { + executor.shutdown(); + ConnectContext.remove(); + } + } + + /** + * Test that nested async threads also properly propagate and clean up ConnectContext. + * This simulates the nested CompletableFuture.runAsync() pattern in HiveScanNode.startSplit(). + */ + @Test + public void testNestedAsyncConnectContextPropagation() throws Exception { + // Create a mock ConnectContext + ConnectContext parentContext = Mockito.mock(ConnectContext.class); + + ExecutorService executor = Executors.newFixedThreadPool(2); + CountDownLatch outerLatch = new CountDownLatch(1); + CountDownLatch innerLatch = new CountDownLatch(1); + AtomicReference outerContextRef = new AtomicReference<>(); + AtomicReference innerContextRef = new AtomicReference<>(); + AtomicReference outerAfterCleanupRef = new AtomicReference<>(); + AtomicReference innerAfterCleanupRef = new AtomicReference<>(); + + try { + // Simulate the nested async pattern in HiveScanNode.startSplit() + CompletableFuture.runAsync(() -> { + // Outer async thread: propagate ConnectContext + if (parentContext != null) { + parentContext.setThreadLocalInfo(); + } + try { + outerContextRef.set(ConnectContext.get()); + + // Inner async thread (like the partition processing in startSplit) + CompletableFuture.runAsync(() -> { + // Inner async thread: propagate ConnectContext + if (parentContext != null) { + parentContext.setThreadLocalInfo(); + } + try { + innerContextRef.set(ConnectContext.get()); + } finally { + // Clean up inner thread's ThreadLocal + ConnectContext.remove(); + innerAfterCleanupRef.set(ConnectContext.get()); + innerLatch.countDown(); + } + }, executor).join(); + + } finally { + // Clean up outer thread's ThreadLocal + ConnectContext.remove(); + outerAfterCleanupRef.set(ConnectContext.get()); + outerLatch.countDown(); + } + }, executor).get(5, TimeUnit.SECONDS); + + // Wait for all tasks to complete + Assert.assertTrue("Outer task should complete", outerLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue("Inner task should complete", innerLatch.await(5, TimeUnit.SECONDS)); + + // Verify ConnectContext was available in both async threads + Assert.assertNotNull("ConnectContext should be available in outer async thread", + outerContextRef.get()); + Assert.assertNotNull("ConnectContext should be available in inner async thread", + innerContextRef.get()); + + // Verify both threads cleaned up their ThreadLocal + Assert.assertNull("Outer thread should clean up ConnectContext", outerAfterCleanupRef.get()); + Assert.assertNull("Inner thread should clean up ConnectContext", innerAfterCleanupRef.get()); + + } finally { + executor.shutdown(); + ConnectContext.remove(); + } + } + + /** + * Test that ThreadLocal cleanup works correctly even when exception occurs. + */ + @Test + public void testConnectContextCleanupOnException() throws Exception { + ConnectContext parentContext = Mockito.mock(ConnectContext.class); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + AtomicReference afterExceptionRef = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + try { + CompletableFuture future = CompletableFuture.runAsync(() -> { + if (parentContext != null) { + parentContext.setThreadLocalInfo(); + } + try { + // Simulate an exception during processing + throw new RuntimeException("Simulated exception"); + } finally { + // ThreadLocal should still be cleaned up even on exception + ConnectContext.remove(); + afterExceptionRef.set(ConnectContext.get()); + latch.countDown(); + } + }, executor); + + // The future should complete exceptionally + try { + future.get(5, TimeUnit.SECONDS); + Assert.fail("Should have thrown an exception"); + } catch (Exception e) { + // Expected + } + + Assert.assertTrue("Task should complete", latch.await(5, TimeUnit.SECONDS)); + + // Verify ThreadLocal was cleaned up even after exception + Assert.assertNull("ConnectContext should be cleaned up after exception", afterExceptionRef.get()); + + } finally { + executor.shutdown(); + ConnectContext.remove(); + } + } }