Skip to content

Commit

Permalink
Core: Add FileIO cache for REST catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed May 1, 2023
1 parent 2b8850b commit 06a4b5b
Showing 1 changed file with 39 additions and 26 deletions.
65 changes: 39 additions & 26 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog
private ResourcePaths paths = null;
private SnapshotMode snapshotMode = null;
private Object conf = null;
private FileIO io = null;
private Cache<TableIdentifier, FileIO> fileIOCache = null;
private MetricsReporter reporter = null;
private boolean reportingViaRestEnabled;

Expand Down Expand Up @@ -191,7 +191,7 @@ public void initialize(String name, Map<String, String> unresolved) {
client, tokenRefreshExecutor(), token, expiresAtMillis(mergedProps), catalogAuth);
}

this.io = newFileIO(SessionContext.createEmpty(), mergedProps);
this.fileIOCache = newFileIOCache();

this.snapshotMode =
SnapshotMode.valueOf(
Expand Down Expand Up @@ -300,15 +300,13 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
response = loadInternal(context, identifier, snapshotMode);
loadedIdent = identifier;
metadataType = null;

} catch (NoSuchTableException original) {
metadataType = MetadataTableType.from(identifier.name());
if (metadataType != null) {
// attempt to load a metadata table using the identifier's namespace as the base table
TableIdentifier baseIdent = TableIdentifier.of(identifier.namespace().levels());
loadedIdent = TableIdentifier.of(identifier.namespace().levels());
try {
response = loadInternal(context, baseIdent, snapshotMode);
loadedIdent = baseIdent;
response = loadInternal(context, loadedIdent, snapshotMode);
} catch (NoSuchTableException ignored) {
// the base table does not exist
throw original;
Expand All @@ -319,6 +317,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
}
}

final TableIdentifier finalIdentifier = loadedIdent;
AuthSession session = tableSession(response.config(), session(context));
TableMetadata tableMetadata;

Expand All @@ -329,7 +328,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
.setPreviousFileLocation(null)
.setSnapshotsSupplier(
() ->
loadInternal(context, identifier, SnapshotMode.ALL)
loadInternal(context, finalIdentifier, SnapshotMode.ALL)
.tableMetadata()
.snapshots())
.discardChanges()
Expand All @@ -341,16 +340,16 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
RESTTableOperations ops =
new RESTTableOperations(
client,
paths.table(loadedIdent),
paths.table(finalIdentifier),
session::headers,
tableFileIO(context, response.config()),
tableFileIO(finalIdentifier, context, response.config()),
tableMetadata);

BaseTable table =
new BaseTable(
ops,
fullTableName(loadedIdent),
metricsReporter(paths.metrics(loadedIdent), session::headers));
fullTableName(finalIdentifier),
metricsReporter(paths.metrics(finalIdentifier), session::headers));
if (metadataType != null) {
return MetadataTableUtils.createMetadataTableInstance(table, metadataType);
}
Expand Down Expand Up @@ -589,7 +588,7 @@ public Table create() {
client,
paths.table(ident),
session::headers,
tableFileIO(context, response.config()),
tableFileIO(ident, context, response.config()),
response.tableMetadata());

return new BaseTable(
Expand All @@ -609,7 +608,7 @@ public Transaction createTransaction() {
client,
paths.table(ident),
session::headers,
tableFileIO(context, response.config()),
tableFileIO(ident, context, response.config()),
RESTTableOperations.UpdateType.CREATE,
createChanges(meta),
meta);
Expand Down Expand Up @@ -660,7 +659,7 @@ public Transaction replaceTransaction() {
client,
paths.table(ident),
session::headers,
tableFileIO(context, response.config()),
tableFileIO(ident, context, response.config()),
RESTTableOperations.UpdateType.REPLACE,
changes.build(),
base);
Expand Down Expand Up @@ -750,24 +749,25 @@ private String fullTableName(TableIdentifier ident) {
return String.format("%s.%s", name(), ident);
}

private FileIO newFileIO(SessionContext context, Map<String, String> properties) {
private FileIO newFileIO(
TableIdentifier identifier, SessionContext context, Map<String, String> properties) {
if (null != ioBuilder) {
return ioBuilder.apply(context, properties);
} else {
String ioImpl =
properties.getOrDefault(CatalogProperties.FILE_IO_IMPL, ResolvingFileIO.class.getName());
return CatalogUtil.loadFileIO(ioImpl, properties, conf);
return fileIOCache.get(
identifier,
ident -> {
String ioImpl =
properties.getOrDefault(
CatalogProperties.FILE_IO_IMPL, ResolvingFileIO.class.getName());
return CatalogUtil.loadFileIO(ioImpl, properties, conf);
});
}
}

private FileIO tableFileIO(SessionContext context, Map<String, String> config) {
if (config.isEmpty() && ioBuilder == null) {
return io; // reuse client and io since config is the same
}

Map<String, String> fullConf = RESTUtil.merge(properties(), config);

return newFileIO(context, fullConf);
private FileIO tableFileIO(
TableIdentifier identifier, SessionContext context, Map<String, String> config) {
return newFileIO(identifier, context, RESTUtil.merge(properties(), config));
}

private AuthSession tableSession(Map<String, String> tableConf, AuthSession parent) {
Expand Down Expand Up @@ -874,4 +874,17 @@ private static Cache<String, AuthSession> newSessionCache(Map<String, String> pr
(RemovalListener<String, AuthSession>) (id, auth, cause) -> auth.stopRefreshing())
.build();
}

private Cache<TableIdentifier, FileIO> newFileIOCache() {
return Caffeine.newBuilder()
.weakKeys()
.removalListener(
(RemovalListener<TableIdentifier, FileIO>)
(identifier, fileIO, cause) -> {
if (null != fileIO) {
fileIO.close();
}
})
.build();
}
}

0 comments on commit 06a4b5b

Please sign in to comment.