From 4f80c17ef38a3e2a4ab20bb0547e3a8bdd1dcf65 Mon Sep 17 00:00:00 2001 From: Asish Kumar Date: Wed, 13 May 2026 14:47:56 +0530 Subject: [PATCH] [core] Coalesce concurrent CachingCatalog table loads --- .../apache/paimon/catalog/CachingCatalog.java | 40 +++++++++++++----- .../paimon/catalog/CachingCatalogTest.java | 41 +++++++++++++++++++ 2 files changed, 70 insertions(+), 11 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index aad5d16a11e0..c779db7d20bc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -226,11 +226,6 @@ public void alterTable( @Override public Table getTable(Identifier identifier) throws TableNotExistException { - Table table = tableCache.getIfPresent(identifier); - if (table != null) { - return table; - } - // For system table, do not cache it directly. Instead, cache the origin table and then wrap // it to generate the system table. if (identifier.isSystemTable()) { @@ -241,7 +236,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException { identifier.getBranchName(), null); Table originTable = getTable(originIdentifier); - table = + Table table = SystemTableLoader.load( checkNotNull(identifier.getSystemTableName()), (FileStoreTable) originTable); @@ -251,12 +246,21 @@ public Table getTable(Identifier identifier) throws TableNotExistException { return table; } - table = wrapped.getTable(identifier); - putTableCache(identifier, table); - return table; + try { + return tableCache.get(identifier, this::loadTable); + } catch (TableLoadingException e) { + throw e.tableNotExistException(); + } } - private void putTableCache(Identifier identifier, Table table) { + private Table loadTable(Identifier identifier) { + Table table; + try { + table = wrapped.getTable(identifier); + } catch (TableNotExistException e) { + throw new TableLoadingException(e); + } + if (table instanceof FileStoreTable) { FileStoreTable storeTable = (FileStoreTable) table; storeTable.setSnapshotCache( @@ -283,7 +287,21 @@ private void putTableCache(Identifier identifier, Table table) { } } - tableCache.put(identifier, table); + return table; + } + + private static class TableLoadingException extends RuntimeException { + + private final TableNotExistException tableNotExistException; + + private TableLoadingException(TableNotExistException cause) { + super(cause); + this.tableNotExistException = cause; + } + + private TableNotExistException tableNotExistException() { + return tableNotExistException; + } } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java index 153f2af81d98..53928aef66bf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -56,8 +56,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -175,6 +177,45 @@ public void testInvalidateBranchIfBaseTableIsDropped() throws Exception { .hasMessage("Table db.tbl$branch_b1 does not exist."); } + @Test + public void testConcurrentGetTableLoadsTableOnce() throws Exception { + Catalog wrapped = Mockito.mock(Catalog.class); + CachingCatalog catalog = new CachingCatalog(wrapped, new Options()); + Identifier tableIdent = new Identifier("db", "tbl"); + Table table = Mockito.mock(Table.class); + AtomicInteger loadCount = new AtomicInteger(); + CountDownLatch loadStarted = new CountDownLatch(1); + CountDownLatch releaseLoad = new CountDownLatch(1); + when(wrapped.getTable(tableIdent)) + .thenAnswer( + invocation -> { + loadCount.incrementAndGet(); + loadStarted.countDown(); + assertThat(releaseLoad.await(10, TimeUnit.SECONDS)).isTrue(); + return table; + }); + + int numThreads = 8; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + try { + List> futures = new ArrayList<>(); + for (int i = 0; i < numThreads; i++) { + futures.add(executor.submit(() -> catalog.getTable(tableIdent))); + } + + assertThat(loadStarted.await(10, TimeUnit.SECONDS)).isTrue(); + releaseLoad.countDown(); + + for (Future future : futures) { + assertThat(future.get(10, TimeUnit.SECONDS)).isSameAs(table); + } + assertThat(loadCount).hasValue(1); + } finally { + releaseLoad.countDown(); + executor.shutdownNow(); + } + } + @Test public void testTableExpiresAfterInterval() throws Exception { TestableCachingCatalog catalog =