diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java index 352f733f8e7a2c..437b89e580bf59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java @@ -83,40 +83,49 @@ public int hashCode() { public static class RowCountCacheLoader extends BasicAsyncCacheLoader> { @Override protected Optional doLoad(RowCountKey rowCountKey) { - try { - TableIf table = StatisticsUtil.findTable(rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId); - return Optional.of(table.fetchRowCount()); - } catch (Exception e) { - String message = String.format("Failed to get table row count with catalogId %s, dbId %s, tableId %s. " - + "Reason %s", - rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId, e.getMessage()); - if (LOG.isDebugEnabled()) { - LOG.debug(message, e); - } else { - LOG.warn(message); - } + return loadRowCount(rowCountKey, false); + } + } - // Return Optional.empty() will cache this empty value in memory, - // so we can't try to load the row count until the cache expire. - // Throw an exception here will cause too much stack log in fe.out. - // So we return null when exception happen. - // Null may raise NPE in caller, but that is expected. - // We catch that NPE and return a default value -1 without keep the value in cache, - // so we can trigger the load function to fetch row count again next time in this exception case. - return null; + static Optional loadRowCount(RowCountKey rowCountKey, boolean fillMetaCache) { + try { + ExternalTable table = (ExternalTable) StatisticsUtil.findTable( + rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId); + return Optional.of(table.fetchRowCountWithMetaCache(fillMetaCache)); + } catch (Exception e) { + String message = String.format("Failed to get table row count with catalogId %s, dbId %s, tableId %s. " + + "Reason %s", + rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId, e.getMessage()); + if (LOG.isDebugEnabled()) { + LOG.debug(message, e); + } else { + LOG.warn(message); } + + // Return Optional.empty() will cache this empty value in memory, + // so we can't try to load the row count until the cache expire. + // Throw an exception here will cause too much stack log in fe.out. + // So we return null when exception happen. + // Null may raise NPE in caller, but that is expected. + // We catch that NPE and return a default value -1 without keep the value in cache, + // so we can trigger the load function to fetch row count again next time in this exception case. + return null; } } /** * Get cached row count for the given table. Return -1 if cached not loaded or table not exists. * Cached will be loaded async. + * @param fillMetaCache whether loading the row count may fill external metadata caches * @return Cached row count or -1 if not exist */ - public long getCachedRowCount(long catalogId, long dbId, long tableId) { + public long getCachedRowCount(long catalogId, long dbId, long tableId, boolean fillMetaCache) { RowCountKey key = new RowCountKey(catalogId, dbId, tableId); try { - CompletableFuture> f = rowCountCache.get(key); + CompletableFuture> f = fillMetaCache + ? rowCountCache.get(key, (rowCountKey, executor) -> CompletableFuture.supplyAsync( + () -> loadRowCount(rowCountKey, true), executor)) + : rowCountCache.get(key); // Get row count synchronously by default. if (ConnectContext.get() == null || ConnectContext.get().getSessionVariable().fetchHiveRowCountSync) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 0ac7993c27dedc..24de55133be39c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -246,7 +246,8 @@ public long getRowCount() { return TableIf.UNKNOWN_ROW_COUNT; } // All external table should get external row count from cache. - return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id); + return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache() + .getCachedRowCount(catalog.getId(), dbId, id, true); } @Override @@ -260,7 +261,8 @@ public long getCachedRowCount() { } // getExtMetaCacheMgr().getRowCountCache().getCachedRowCount() is an asynchronous non-blocking operation. // For tables that are not in the cache, it will load asynchronously and return -1. - return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id); + return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache() + .getCachedRowCount(catalog.getId(), dbId, id, false); } @Override @@ -272,6 +274,13 @@ public long fetchRowCount() { return UNKNOWN_ROW_COUNT; } + /** + * Fetch row count, and allow the load path to fill external metadata cache if supported. + */ + public long fetchRowCountWithMetaCache(boolean fillMetaCache) { + return fetchRowCount(); + } + @Override public long getAvgRowLength() { return 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/PluginDrivenExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/PluginDrivenExternalTable.java index 16018e562473a1..00717f1c89aa07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/PluginDrivenExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/PluginDrivenExternalTable.java @@ -125,7 +125,7 @@ public long getCachedRowCount() { return -1; } return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache() - .getCachedRowCount(catalog.getId(), dbId, id); + .getCachedRowCount(catalog.getId(), dbId, id, false); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index b9b9d5f8415cb0..03dc966fd773ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -788,13 +788,22 @@ private List getSchemaFromRemoteTable() { @Override public long fetchRowCount() { + return fetchRowCountInternal(false); + } + + @Override + public long fetchRowCountWithMetaCache(boolean fillMetaCache) { + return fetchRowCountInternal(fillMetaCache); + } + + private long fetchRowCountInternal(boolean fillMetaCache) { makeSureInitialized(); // Get row count from hive metastore property. long rowCount = getRowCountFromExternalSource(); // Only hive table supports estimate row count by listing file. if (rowCount == UNKNOWN_ROW_COUNT && dlaType.equals(DLAType.HIVE)) { LOG.info("Will estimate row count for table {} from file list.", name); - rowCount = getRowCountFromFileList(); + rowCount = getRowCountFromFileList(fillMetaCache); } return rowCount; } @@ -949,7 +958,7 @@ public void gsonPostProcess() throws IOException { @Override public List getChunkSizes() { HiveExternalMetaCache.HivePartitionValues partitionValues = getAllPartitionValues(); - List filesByPartitions = getFilesForPartitions(partitionValues, 0); + List filesByPartitions = getFilesForPartitions(partitionValues, 0, false); List result = Lists.newArrayList(); for (HiveExternalMetaCache.FileCacheValue files : filesByPartitions) { for (HiveExternalMetaCache.HiveFileStatus file : files.getFiles()) { @@ -1047,7 +1056,7 @@ public boolean isPartitionColumnAllowNull() { /** * Estimate hive table row count : totalFileSize/estimatedRowSize */ - private long getRowCountFromFileList() { + private long getRowCountFromFileList(boolean fillMetaCache) { if (!GlobalVariable.enable_get_row_count_from_file_list) { return UNKNOWN_ROW_COUNT; } @@ -1061,7 +1070,7 @@ private long getRowCountFromFileList() { // Get files for all partitions. int samplePartitionSize = Config.hive_stats_partition_sample_size; List filesByPartitions = - getFilesForPartitions(partitionValues, samplePartitionSize); + getFilesForPartitions(partitionValues, samplePartitionSize, fillMetaCache); LOG.info("Number of files selected for hive table {} is {}", name, filesByPartitions.size()); long totalSize = 0; // Calculate the total file size. @@ -1127,7 +1136,7 @@ private HiveExternalMetaCache.HivePartitionValues getAllPartitionValues() { // Get all files related to given partition values // If sampleSize > 0, randomly choose part of partitions of the whole table. private List getFilesForPartitions( - HiveExternalMetaCache.HivePartitionValues partitionValues, int sampleSize) { + HiveExternalMetaCache.HivePartitionValues partitionValues, int sampleSize, boolean fillMetaCache) { if (isView()) { return Lists.newArrayList(); } @@ -1152,9 +1161,10 @@ private List getFilesForPartitions( for (PartitionItem item : partitionItems) { partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList()); } - // get partitions without cache, so that it will not invalid the cache when executing - // non query request such as `show table status` - hivePartitions = cache.getAllPartitionsWithoutCache(this, partitionValuesList); + // Non-query requests such as `show table status` should not fill heavy metadata caches. + hivePartitions = fillMetaCache + ? cache.getAllPartitionsWithCache(this, partitionValuesList) + : cache.getAllPartitionsWithoutCache(this, partitionValuesList); LOG.info("Partition list size for hive partition table {} is {}", name, hivePartitions.size()); } else { hivePartitions.add(new HivePartition(getOrBuildNameMapping(), true, @@ -1167,7 +1177,7 @@ private List getFilesForPartitions( LOG.debug("Chosen partition for table {}. [{}]", name, partition.toString()); } } - return cache.getFilesByPartitions(hivePartitions, false, true, new FileSystemDirectoryLister(), null); + return cache.getFilesByPartitions(hivePartitions, fillMetaCache, true, new FileSystemDirectoryLister(), null); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java index b9fba093475fb3..e0dd6eeff60aa3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java @@ -19,10 +19,13 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.statistics.util.StatisticsUtil; +import com.google.common.util.concurrent.MoreExecutors; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; import org.mockito.Mockito; import java.util.Optional; @@ -31,6 +34,52 @@ import java.util.concurrent.atomic.AtomicReference; public class ExternalRowCountCacheTest { + @Test + public void testRowCountKeyUsesTableIdAsCacheIdentity() { + ExternalRowCountCache.RowCountKey key1 = new ExternalRowCountCache.RowCountKey(1, 2, 3); + ExternalRowCountCache.RowCountKey key2 = new ExternalRowCountCache.RowCountKey(2, 3, 3); + + Assertions.assertEquals(key1, key2); + Assertions.assertEquals(key1.hashCode(), key2.hashCode()); + } + + @Test + public void testLoadRowCountPassesFillMetaCacheToTable() { + ExternalTable table = Mockito.mock(ExternalTable.class); + Mockito.when(table.fetchRowCountWithMetaCache(true)).thenReturn(100L); + Mockito.when(table.fetchRowCountWithMetaCache(false)).thenReturn(200L); + + try (MockedStatic mockedStatisticsUtil = Mockito.mockStatic(StatisticsUtil.class)) { + mockedStatisticsUtil.when(() -> StatisticsUtil.findTable(1, 2, 3)).thenReturn(table); + + ExternalRowCountCache.RowCountKey key = new ExternalRowCountCache.RowCountKey(1, 2, 3); + Assertions.assertEquals(100L, ExternalRowCountCache.loadRowCount(key, true).get()); + Assertions.assertEquals(200L, ExternalRowCountCache.loadRowCount(key, false).get()); + } + + Mockito.verify(table).fetchRowCountWithMetaCache(true); + Mockito.verify(table).fetchRowCountWithMetaCache(false); + } + + @Test + public void testGetCachedRowCountPassesFillMetaCacheToLoader() { + ExternalTable table = Mockito.mock(ExternalTable.class); + Mockito.when(table.fetchRowCountWithMetaCache(true)).thenReturn(100L); + Mockito.when(table.fetchRowCountWithMetaCache(false)).thenReturn(200L); + + try (MockedStatic mockedStatisticsUtil = Mockito.mockStatic(StatisticsUtil.class)) { + mockedStatisticsUtil.when(() -> StatisticsUtil.findTable(1, 2, 3)).thenReturn(table); + mockedStatisticsUtil.when(() -> StatisticsUtil.findTable(1, 2, 4)).thenReturn(table); + + ExternalRowCountCache cache = new ExternalRowCountCache(MoreExecutors.newDirectExecutorService()); + Assertions.assertEquals(100L, cache.getCachedRowCount(1, 2, 3, true)); + Assertions.assertEquals(200L, cache.getCachedRowCount(1, 2, 4, false)); + } + + Mockito.verify(table).fetchRowCountWithMetaCache(true); + Mockito.verify(table).fetchRowCountWithMetaCache(false); + } + @Test public void testLoadWithException() throws Exception { ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool( @@ -50,7 +99,7 @@ public void testLoadWithException() throws Exception { })) { ExternalRowCountCache cache = new ExternalRowCountCache(executor); - long cachedRowCount = cache.getCachedRowCount(1, 1, 1); + long cachedRowCount = cache.getCachedRowCount(1, 1, 1, false); Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); for (int i = 0; i < 60; i++) { if (counter.get() == 1) { @@ -66,16 +115,16 @@ public void testLoadWithException() throws Exception { return Optional.of(100L); }).when(loaderRef.get()).doLoad(Mockito.any()); - cache.getCachedRowCount(1, 1, 1); + cache.getCachedRowCount(1, 1, 1, false); for (int i = 0; i < 60; i++) { - cachedRowCount = cache.getCachedRowCount(1, 1, 1); + cachedRowCount = cache.getCachedRowCount(1, 1, 1, false); if (cachedRowCount != TableIf.UNKNOWN_ROW_COUNT) { Assertions.assertEquals(100, cachedRowCount); break; } Thread.sleep(1000); } - cachedRowCount = cache.getCachedRowCount(1, 1, 1); + cachedRowCount = cache.getCachedRowCount(1, 1, 1, false); Assertions.assertEquals(100, cachedRowCount); Assertions.assertEquals(2, counter.get()); @@ -90,10 +139,10 @@ public void testLoadWithException() throws Exception { return Optional.of(100L); }).when(loaderRef.get()).doLoad(Mockito.any()); - cachedRowCount = cache.getCachedRowCount(2, 2, 2); + cachedRowCount = cache.getCachedRowCount(2, 2, 2, false); Assertions.assertEquals(100, cachedRowCount); Thread.sleep(1000); - cachedRowCount = cache.getCachedRowCount(2, 2, 2); + cachedRowCount = cache.getCachedRowCount(2, 2, 2, false); Assertions.assertEquals(100, cachedRowCount); for (int i = 0; i < 60; i++) { if (counter.get() == 3) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSExternalTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSExternalTableTest.java index a3d61fd9753544..2cefa4821dadae 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSExternalTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HMSExternalTableTest.java @@ -17,17 +17,34 @@ package org.apache.doris.datasource.hive; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.Type; import org.apache.doris.common.UserException; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.datasource.ExternalMetaCacheMgr; +import org.apache.doris.fs.FileSystemDirectoryLister; import org.apache.doris.thrift.TFileFormatType; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; import org.mockito.Mockito; +import java.util.Collections; +import java.util.List; +import java.util.Map; + /** * Test class for HMSExternalTable, focusing on view-related functionality @@ -178,6 +195,74 @@ public void testGetFileFormatType_MapreduceLzoTextInputFormat_ReturnsText() thro "com.hadoop.mapreduce.LzoTextInputFormat table should also resolve to FORMAT_TEXT for reading"); } + @Test + public void testFetchRowCountFillsMetaCacheOnlyWhenRequested() throws Exception { + long catalogId = 100L; + String localDbName = "test_db"; + String partitionValue = "2026-05-21"; + String inputFormat = "org.apache.hadoop.mapred.TextInputFormat"; + String partitionLocation = "file:///tmp/doris_hms_row_count_cache/dt=2026-05-21"; + + HMSExternalCatalog catalog = Mockito.mock(HMSExternalCatalog.class); + HMSExternalDatabase db = Mockito.mock(HMSExternalDatabase.class); + Mockito.when(catalog.getId()).thenReturn(catalogId); + Mockito.when(catalog.getName()).thenReturn("test_catalog"); + Mockito.when(catalog.getProperties()).thenReturn(ImmutableMap.of()); + Mockito.when(db.getFullName()).thenReturn(localDbName); + + Table remoteTable = buildRemoteTableWithInputFormat(inputFormat); + remoteTable.setParameters(ImmutableMap.of()); + TestHMSExternalTableForMetaCache table = new TestHMSExternalTableForMetaCache( + catalog, db, remoteTable, partitionValue); + Deencapsulation.setField(table, "dlaType", HMSExternalTable.DLAType.HIVE); + + List partitions = Collections.singletonList(new HivePartition( + null, false, inputFormat, partitionLocation, Collections.singletonList(partitionValue), + Collections.emptyMap())); + HiveExternalMetaCache.FileCacheValue fileCacheValue = new HiveExternalMetaCache.FileCacheValue(); + HiveExternalMetaCache.HiveFileStatus status = new HiveExternalMetaCache.HiveFileStatus(); + status.setLength(128L); + fileCacheValue.getFiles().add(status); + List files = Collections.singletonList(fileCacheValue); + + HiveExternalMetaCache hiveCache = Mockito.mock(HiveExternalMetaCache.class); + Mockito.when(hiveCache.getAllPartitionsWithCache(Mockito.eq(table), Mockito.anyList())) + .thenReturn(partitions); + Mockito.when(hiveCache.getAllPartitionsWithoutCache(Mockito.eq(table), Mockito.anyList())) + .thenReturn(partitions); + Mockito.when(hiveCache.getFilesByPartitions(Mockito.eq(partitions), Mockito.eq(true), Mockito.eq(true), + Mockito.any(FileSystemDirectoryLister.class), Mockito.isNull())) + .thenReturn(files); + Mockito.when(hiveCache.getFilesByPartitions(Mockito.eq(partitions), Mockito.eq(false), Mockito.eq(true), + Mockito.any(FileSystemDirectoryLister.class), Mockito.isNull())) + .thenReturn(files); + + Env env = Mockito.mock(Env.class); + ExternalMetaCacheMgr extMetaCacheMgr = Mockito.mock(ExternalMetaCacheMgr.class); + Mockito.when(env.getExtMetaCacheMgr()).thenReturn(extMetaCacheMgr); + Mockito.when(extMetaCacheMgr.hive(catalogId)).thenReturn(hiveCache); + + try (MockedStatic mockedEnv = Mockito.mockStatic(Env.class)) { + mockedEnv.when(Env::getCurrentEnv).thenReturn(env); + + Assertions.assertEquals(32L, table.fetchRowCountWithMetaCache(true)); + Mockito.verify(hiveCache).getAllPartitionsWithCache(Mockito.eq(table), Mockito.anyList()); + Mockito.verify(hiveCache, Mockito.never()) + .getAllPartitionsWithoutCache(Mockito.eq(table), Mockito.anyList()); + Mockito.verify(hiveCache).getFilesByPartitions(Mockito.eq(partitions), Mockito.eq(true), + Mockito.eq(true), Mockito.any(FileSystemDirectoryLister.class), Mockito.isNull()); + + Mockito.clearInvocations(hiveCache); + + Assertions.assertEquals(32L, table.fetchRowCount()); + Mockito.verify(hiveCache).getAllPartitionsWithoutCache(Mockito.eq(table), Mockito.anyList()); + Mockito.verify(hiveCache, Mockito.never()) + .getAllPartitionsWithCache(Mockito.eq(table), Mockito.anyList()); + Mockito.verify(hiveCache).getFilesByPartitions(Mockito.eq(partitions), Mockito.eq(false), + Mockito.eq(true), Mockito.any(FileSystemDirectoryLister.class), Mockito.isNull()); + } + } + /** * Variant that exposes a pre-built remote table for getFileFormatType tests. */ @@ -201,6 +286,61 @@ protected synchronized void makeSureInitialized() { } } + private static class TestHMSExternalTableForMetaCache extends TestHMSExternalTableWithRemote { + private final Column dataColumn = new Column("c1", Type.INT); + private final Column partitionColumn = new Column("dt", Type.VARCHAR); + private final HiveExternalMetaCache.HivePartitionValues partitionValues; + + public TestHMSExternalTableForMetaCache(HMSExternalCatalog catalog, HMSExternalDatabase db, + Table remoteTable, String partitionValue) throws Exception { + super(catalog, db, remoteTable); + PartitionKey partitionKey = PartitionKey.createListPartitionKeyWithTypes( + Lists.newArrayList(new org.apache.doris.analysis.PartitionValue(partitionValue)), + Lists.newArrayList(Type.VARCHAR), + true); + PartitionItem partitionItem = new ListPartitionItem(Lists.newArrayList(partitionKey)); + long partitionId = 1L; + Map idToPartitionItem = ImmutableMap.of(partitionId, partitionItem); + this.partitionValues = new HiveExternalMetaCache.HivePartitionValues( + idToPartitionItem, + HashBiMap.create(ImmutableMap.of("dt=" + partitionValue, partitionId)), + ImmutableMap.of(partitionId, Collections.singletonList(partitionValue))); + } + + @Override + public List getFullSchema() { + return Lists.newArrayList(dataColumn, partitionColumn); + } + + @Override + public boolean isView() { + return false; + } + + @Override + public List getPartitionColumnTypes(java.util.Optional + snapshot) { + return Collections.singletonList(Type.VARCHAR); + } + + @Override + public List getPartitionColumns() { + return Collections.singletonList(partitionColumn); + } + + @Override + public List getPartitionColumns(java.util.Optional + snapshot) { + return Collections.singletonList(partitionColumn); + } + + @Override + public HiveExternalMetaCache.HivePartitionValues getHivePartitionValues( + java.util.Optional snapshot) { + return partitionValues; + } + } + /** * Test implementation of HMSExternalTable that allows setting view texts * Uses parent's getViewText() implementation for actual testing