Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,6 @@ public void alterTable(

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
Table table = tableCache.getIfPresent(identifier);
if (table != null) {
return table;
}

// For system table, do not cache it directly. Instead, cache the origin table and then wrap
// it to generate the system table.
if (identifier.isSystemTable()) {
Expand All @@ -241,7 +236,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
identifier.getBranchName(),
null);
Table originTable = getTable(originIdentifier);
table =
Table table =
SystemTableLoader.load(
checkNotNull(identifier.getSystemTableName()),
(FileStoreTable) originTable);
Expand All @@ -251,12 +246,21 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
return table;
}

table = wrapped.getTable(identifier);
putTableCache(identifier, table);
return table;
try {
return tableCache.get(identifier, this::loadTable);
} catch (TableLoadingException e) {
throw e.tableNotExistException();
}
}

private void putTableCache(Identifier identifier, Table table) {
private Table loadTable(Identifier identifier) {
Table table;
try {
table = wrapped.getTable(identifier);
} catch (TableNotExistException e) {
throw new TableLoadingException(e);
}

if (table instanceof FileStoreTable) {
FileStoreTable storeTable = (FileStoreTable) table;
storeTable.setSnapshotCache(
Expand All @@ -283,7 +287,21 @@ private void putTableCache(Identifier identifier, Table table) {
}
}

tableCache.put(identifier, table);
return table;
}

private static class TableLoadingException extends RuntimeException {

private final TableNotExistException tableNotExistException;

private TableLoadingException(TableNotExistException cause) {
super(cause);
this.tableNotExistException = cause;
}

private TableNotExistException tableNotExistException() {
return tableNotExistException;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -175,6 +177,45 @@ public void testInvalidateBranchIfBaseTableIsDropped() throws Exception {
.hasMessage("Table db.tbl$branch_b1 does not exist.");
}

@Test
public void testConcurrentGetTableLoadsTableOnce() throws Exception {
Catalog wrapped = Mockito.mock(Catalog.class);
CachingCatalog catalog = new CachingCatalog(wrapped, new Options());
Identifier tableIdent = new Identifier("db", "tbl");
Table table = Mockito.mock(Table.class);
AtomicInteger loadCount = new AtomicInteger();
CountDownLatch loadStarted = new CountDownLatch(1);
CountDownLatch releaseLoad = new CountDownLatch(1);
when(wrapped.getTable(tableIdent))
.thenAnswer(
invocation -> {
loadCount.incrementAndGet();
loadStarted.countDown();
assertThat(releaseLoad.await(10, TimeUnit.SECONDS)).isTrue();
return table;
});

int numThreads = 8;
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
try {
List<Future<Table>> futures = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
futures.add(executor.submit(() -> catalog.getTable(tableIdent)));
}

assertThat(loadStarted.await(10, TimeUnit.SECONDS)).isTrue();
releaseLoad.countDown();

for (Future<Table> future : futures) {
assertThat(future.get(10, TimeUnit.SECONDS)).isSameAs(table);
}
assertThat(loadCount).hasValue(1);
} finally {
releaseLoad.countDown();
executor.shutdownNow();
}
}

@Test
public void testTableExpiresAfterInterval() throws Exception {
TestableCachingCatalog catalog =
Expand Down