Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,40 +83,49 @@ public int hashCode() {
public static class RowCountCacheLoader extends BasicAsyncCacheLoader<RowCountKey, Optional<Long>> {
@Override
protected Optional<Long> doLoad(RowCountKey rowCountKey) {
try {
TableIf table = StatisticsUtil.findTable(rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId);
return Optional.of(table.fetchRowCount());
} catch (Exception e) {
String message = String.format("Failed to get table row count with catalogId %s, dbId %s, tableId %s. "
+ "Reason %s",
rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId, e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug(message, e);
} else {
LOG.warn(message);
}
return loadRowCount(rowCountKey, false);
}
}

// Return Optional.empty() will cache this empty value in memory,
// so we can't try to load the row count until the cache expire.
// Throw an exception here will cause too much stack log in fe.out.
// So we return null when exception happen.
// Null may raise NPE in caller, but that is expected.
// We catch that NPE and return a default value -1 without keep the value in cache,
// so we can trigger the load function to fetch row count again next time in this exception case.
return null;
static Optional<Long> loadRowCount(RowCountKey rowCountKey, boolean fillMetaCache) {
try {
ExternalTable table = (ExternalTable) StatisticsUtil.findTable(
rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId);
return Optional.of(table.fetchRowCountWithMetaCache(fillMetaCache));
} catch (Exception e) {
String message = String.format("Failed to get table row count with catalogId %s, dbId %s, tableId %s. "
+ "Reason %s",
rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId, e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug(message, e);
} else {
LOG.warn(message);
}

// Return Optional.empty() will cache this empty value in memory,
// so we can't try to load the row count until the cache expire.
// Throw an exception here will cause too much stack log in fe.out.
// So we return null when exception happen.
// Null may raise NPE in caller, but that is expected.
// We catch that NPE and return a default value -1 without keep the value in cache,
// so we can trigger the load function to fetch row count again next time in this exception case.
return null;
}
}

/**
* Get cached row count for the given table. Return -1 if cached not loaded or table not exists.
* Cached will be loaded async.
* @param fillMetaCache whether loading the row count may fill external metadata caches
* @return Cached row count or -1 if not exist
*/
public long getCachedRowCount(long catalogId, long dbId, long tableId) {
public long getCachedRowCount(long catalogId, long dbId, long tableId, boolean fillMetaCache) {
RowCountKey key = new RowCountKey(catalogId, dbId, tableId);
try {
CompletableFuture<Optional<Long>> f = rowCountCache.get(key);
CompletableFuture<Optional<Long>> f = fillMetaCache
? rowCountCache.get(key, (rowCountKey, executor) -> CompletableFuture.supplyAsync(
() -> loadRowCount(rowCountKey, true), executor))
: rowCountCache.get(key);
// Get row count synchronously by default.
if (ConnectContext.get() == null
|| ConnectContext.get().getSessionVariable().fetchHiveRowCountSync) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ public long getRowCount() {
return TableIf.UNKNOWN_ROW_COUNT;
}
// All external table should get external row count from cache.
return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id);
return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache()
.getCachedRowCount(catalog.getId(), dbId, id, true);
}

@Override
Expand All @@ -260,7 +261,8 @@ public long getCachedRowCount() {
}
// getExtMetaCacheMgr().getRowCountCache().getCachedRowCount() is an asynchronous non-blocking operation.
// For tables that are not in the cache, it will load asynchronously and return -1.
return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id);
return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache()
.getCachedRowCount(catalog.getId(), dbId, id, false);
}

@Override
Expand All @@ -272,6 +274,13 @@ public long fetchRowCount() {
return UNKNOWN_ROW_COUNT;
}

/**
* Fetch row count, and allow the load path to fill external metadata cache if supported.
*/
public long fetchRowCountWithMetaCache(boolean fillMetaCache) {
return fetchRowCount();
}

@Override
public long getAvgRowLength() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public long getCachedRowCount() {
return -1;
}
return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache()
.getCachedRowCount(catalog.getId(), dbId, id);
.getCachedRowCount(catalog.getId(), dbId, id, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,13 +788,22 @@ private List<FieldSchema> getSchemaFromRemoteTable() {

@Override
public long fetchRowCount() {
return fetchRowCountInternal(false);
}

@Override
public long fetchRowCountWithMetaCache(boolean fillMetaCache) {
return fetchRowCountInternal(fillMetaCache);
}

private long fetchRowCountInternal(boolean fillMetaCache) {
makeSureInitialized();
// Get row count from hive metastore property.
long rowCount = getRowCountFromExternalSource();
// Only hive table supports estimate row count by listing file.
if (rowCount == UNKNOWN_ROW_COUNT && dlaType.equals(DLAType.HIVE)) {
LOG.info("Will estimate row count for table {} from file list.", name);
rowCount = getRowCountFromFileList();
rowCount = getRowCountFromFileList(fillMetaCache);
}
return rowCount;
}
Expand Down Expand Up @@ -949,7 +958,7 @@ public void gsonPostProcess() throws IOException {
@Override
public List<Long> getChunkSizes() {
HiveExternalMetaCache.HivePartitionValues partitionValues = getAllPartitionValues();
List<HiveExternalMetaCache.FileCacheValue> filesByPartitions = getFilesForPartitions(partitionValues, 0);
List<HiveExternalMetaCache.FileCacheValue> filesByPartitions = getFilesForPartitions(partitionValues, 0, false);
List<Long> result = Lists.newArrayList();
for (HiveExternalMetaCache.FileCacheValue files : filesByPartitions) {
for (HiveExternalMetaCache.HiveFileStatus file : files.getFiles()) {
Expand Down Expand Up @@ -1047,7 +1056,7 @@ public boolean isPartitionColumnAllowNull() {
/**
* Estimate hive table row count : totalFileSize/estimatedRowSize
*/
private long getRowCountFromFileList() {
private long getRowCountFromFileList(boolean fillMetaCache) {
if (!GlobalVariable.enable_get_row_count_from_file_list) {
return UNKNOWN_ROW_COUNT;
}
Expand All @@ -1061,7 +1070,7 @@ private long getRowCountFromFileList() {
// Get files for all partitions.
int samplePartitionSize = Config.hive_stats_partition_sample_size;
List<HiveExternalMetaCache.FileCacheValue> filesByPartitions =
getFilesForPartitions(partitionValues, samplePartitionSize);
getFilesForPartitions(partitionValues, samplePartitionSize, fillMetaCache);
LOG.info("Number of files selected for hive table {} is {}", name, filesByPartitions.size());
long totalSize = 0;
// Calculate the total file size.
Expand Down Expand Up @@ -1127,7 +1136,7 @@ private HiveExternalMetaCache.HivePartitionValues getAllPartitionValues() {
// Get all files related to given partition values
// If sampleSize > 0, randomly choose part of partitions of the whole table.
private List<HiveExternalMetaCache.FileCacheValue> getFilesForPartitions(
HiveExternalMetaCache.HivePartitionValues partitionValues, int sampleSize) {
HiveExternalMetaCache.HivePartitionValues partitionValues, int sampleSize, boolean fillMetaCache) {
if (isView()) {
return Lists.newArrayList();
}
Expand All @@ -1152,9 +1161,10 @@ private List<HiveExternalMetaCache.FileCacheValue> getFilesForPartitions(
for (PartitionItem item : partitionItems) {
partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList());
}
// get partitions without cache, so that it will not invalid the cache when executing
// non query request such as `show table status`
hivePartitions = cache.getAllPartitionsWithoutCache(this, partitionValuesList);
// Non-query requests such as `show table status` should not fill heavy metadata caches.
hivePartitions = fillMetaCache
? cache.getAllPartitionsWithCache(this, partitionValuesList)
: cache.getAllPartitionsWithoutCache(this, partitionValuesList);
LOG.info("Partition list size for hive partition table {} is {}", name, hivePartitions.size());
} else {
hivePartitions.add(new HivePartition(getOrBuildNameMapping(), true,
Expand All @@ -1167,7 +1177,7 @@ private List<HiveExternalMetaCache.FileCacheValue> getFilesForPartitions(
LOG.debug("Chosen partition for table {}. [{}]", name, partition.toString());
}
}
return cache.getFilesByPartitions(hivePartitions, false, true, new FileSystemDirectoryLister(), null);
return cache.getFilesByPartitions(hivePartitions, fillMetaCache, true, new FileSystemDirectoryLister(), null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.util.concurrent.MoreExecutors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import java.util.Optional;
Expand All @@ -31,6 +34,52 @@
import java.util.concurrent.atomic.AtomicReference;

public class ExternalRowCountCacheTest {
@Test
public void testRowCountKeyUsesTableIdAsCacheIdentity() {
ExternalRowCountCache.RowCountKey key1 = new ExternalRowCountCache.RowCountKey(1, 2, 3);
ExternalRowCountCache.RowCountKey key2 = new ExternalRowCountCache.RowCountKey(2, 3, 3);

Assertions.assertEquals(key1, key2);
Assertions.assertEquals(key1.hashCode(), key2.hashCode());
}

@Test
public void testLoadRowCountPassesFillMetaCacheToTable() {
ExternalTable table = Mockito.mock(ExternalTable.class);
Mockito.when(table.fetchRowCountWithMetaCache(true)).thenReturn(100L);
Mockito.when(table.fetchRowCountWithMetaCache(false)).thenReturn(200L);

try (MockedStatic<StatisticsUtil> mockedStatisticsUtil = Mockito.mockStatic(StatisticsUtil.class)) {
mockedStatisticsUtil.when(() -> StatisticsUtil.findTable(1, 2, 3)).thenReturn(table);

ExternalRowCountCache.RowCountKey key = new ExternalRowCountCache.RowCountKey(1, 2, 3);
Assertions.assertEquals(100L, ExternalRowCountCache.loadRowCount(key, true).get());
Assertions.assertEquals(200L, ExternalRowCountCache.loadRowCount(key, false).get());
}

Mockito.verify(table).fetchRowCountWithMetaCache(true);
Mockito.verify(table).fetchRowCountWithMetaCache(false);
}

@Test
public void testGetCachedRowCountPassesFillMetaCacheToLoader() {
ExternalTable table = Mockito.mock(ExternalTable.class);
Mockito.when(table.fetchRowCountWithMetaCache(true)).thenReturn(100L);
Mockito.when(table.fetchRowCountWithMetaCache(false)).thenReturn(200L);

try (MockedStatic<StatisticsUtil> mockedStatisticsUtil = Mockito.mockStatic(StatisticsUtil.class)) {
mockedStatisticsUtil.when(() -> StatisticsUtil.findTable(1, 2, 3)).thenReturn(table);
mockedStatisticsUtil.when(() -> StatisticsUtil.findTable(1, 2, 4)).thenReturn(table);

ExternalRowCountCache cache = new ExternalRowCountCache(MoreExecutors.newDirectExecutorService());
Assertions.assertEquals(100L, cache.getCachedRowCount(1, 2, 3, true));
Assertions.assertEquals(200L, cache.getCachedRowCount(1, 2, 4, false));
}

Mockito.verify(table).fetchRowCountWithMetaCache(true);
Mockito.verify(table).fetchRowCountWithMetaCache(false);
}

@Test
public void testLoadWithException() throws Exception {
ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool(
Expand All @@ -50,7 +99,7 @@ public void testLoadWithException() throws Exception {
})) {

ExternalRowCountCache cache = new ExternalRowCountCache(executor);
long cachedRowCount = cache.getCachedRowCount(1, 1, 1);
long cachedRowCount = cache.getCachedRowCount(1, 1, 1, false);
Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount);
for (int i = 0; i < 60; i++) {
if (counter.get() == 1) {
Expand All @@ -66,16 +115,16 @@ public void testLoadWithException() throws Exception {
return Optional.of(100L);
}).when(loaderRef.get()).doLoad(Mockito.any());

cache.getCachedRowCount(1, 1, 1);
cache.getCachedRowCount(1, 1, 1, false);
for (int i = 0; i < 60; i++) {
cachedRowCount = cache.getCachedRowCount(1, 1, 1);
cachedRowCount = cache.getCachedRowCount(1, 1, 1, false);
if (cachedRowCount != TableIf.UNKNOWN_ROW_COUNT) {
Assertions.assertEquals(100, cachedRowCount);
break;
}
Thread.sleep(1000);
}
cachedRowCount = cache.getCachedRowCount(1, 1, 1);
cachedRowCount = cache.getCachedRowCount(1, 1, 1, false);
Assertions.assertEquals(100, cachedRowCount);
Assertions.assertEquals(2, counter.get());

Expand All @@ -90,10 +139,10 @@ public void testLoadWithException() throws Exception {
return Optional.of(100L);
}).when(loaderRef.get()).doLoad(Mockito.any());

cachedRowCount = cache.getCachedRowCount(2, 2, 2);
cachedRowCount = cache.getCachedRowCount(2, 2, 2, false);
Assertions.assertEquals(100, cachedRowCount);
Thread.sleep(1000);
cachedRowCount = cache.getCachedRowCount(2, 2, 2);
cachedRowCount = cache.getCachedRowCount(2, 2, 2, false);
Assertions.assertEquals(100, cachedRowCount);
for (int i = 0; i < 60; i++) {
if (counter.get() == 3) {
Expand Down
Loading
Loading