Skip to content

Commit

Permalink
Core: Add REST catalog table session cache (#8920)
Browse files Browse the repository at this point in the history
The purpose of caching auth session for tables is mainly so that we can
stop refreshing the session when it isn't used anymore
  • Loading branch information
nastra committed Dec 10, 2023
1 parent 7d06af3 commit 4090a88
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 13 deletions.
56 changes: 44 additions & 12 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.util.EnvironmentUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.ThreadPools;
import org.apache.iceberg.view.BaseView;
Expand Down Expand Up @@ -123,6 +124,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private final Function<Map<String, String>, RESTClient> clientBuilder;
private final BiFunction<SessionContext, Map<String, String>, FileIO> ioBuilder;
private Cache<String, AuthSession> sessions = null;
private Cache<String, AuthSession> tableSessions = null;
private Cache<TableOperations, FileIO> fileIOCloser;
private AuthSession catalogAuth = null;
private boolean keepTokenRefreshed = true;
Expand Down Expand Up @@ -197,6 +199,7 @@ public void initialize(String name, Map<String, String> unresolved) {
Map<String, String> baseHeaders = configHeaders(mergedProps);

this.sessions = newSessionCache(mergedProps);
this.tableSessions = newSessionCache(mergedProps);
this.keepTokenRefreshed =
PropertyUtil.propertyAsBoolean(
mergedProps,
Expand Down Expand Up @@ -242,7 +245,15 @@ private AuthSession session(SessionContext context) {
AuthSession session =
sessions.get(
context.sessionId(),
id -> newSession(context.credentials(), context.properties(), catalogAuth));
id -> {
Pair<String, Supplier<AuthSession>> newSession =
newSession(context.credentials(), context.properties(), catalogAuth);
if (null != newSession) {
return newSession.second().get();
}

return null;
});

return session != null ? session : catalogAuth;
}
Expand Down Expand Up @@ -859,7 +870,12 @@ private FileIO tableFileIO(SessionContext context, Map<String, String> config) {
}

private AuthSession tableSession(Map<String, String> tableConf, AuthSession parent) {
AuthSession session = newSession(tableConf, tableConf, parent);
Pair<String, Supplier<AuthSession>> newSession = newSession(tableConf, tableConf, parent);
if (null == newSession) {
return parent;
}

AuthSession session = tableSessions.get(newSession.first(), id -> newSession.second().get());

return session != null ? session : parent;
}
Expand Down Expand Up @@ -889,30 +905,46 @@ private static ConfigResponse fetchConfig(
return configResponse;
}

private AuthSession newSession(
private Pair<String, Supplier<AuthSession>> newSession(
Map<String, String> credentials, Map<String, String> properties, AuthSession parent) {
if (credentials != null) {
// use the bearer token without exchanging
if (credentials.containsKey(OAuth2Properties.TOKEN)) {
return AuthSession.fromAccessToken(
client,
tokenRefreshExecutor(),
return Pair.of(
credentials.get(OAuth2Properties.TOKEN),
expiresAtMillis(properties),
parent);
() ->
AuthSession.fromAccessToken(
client,
tokenRefreshExecutor(),
credentials.get(OAuth2Properties.TOKEN),
expiresAtMillis(properties),
parent));
}

if (credentials.containsKey(OAuth2Properties.CREDENTIAL)) {
// fetch a token using the client credentials flow
return AuthSession.fromCredential(
client, tokenRefreshExecutor(), credentials.get(OAuth2Properties.CREDENTIAL), parent);
return Pair.of(
credentials.get(OAuth2Properties.CREDENTIAL),
() ->
AuthSession.fromCredential(
client,
tokenRefreshExecutor(),
credentials.get(OAuth2Properties.CREDENTIAL),
parent));
}

for (String tokenType : TOKEN_PREFERENCE_ORDER) {
if (credentials.containsKey(tokenType)) {
// exchange the token for an access token using the token exchange flow
return AuthSession.fromTokenExchange(
client, tokenRefreshExecutor(), credentials.get(tokenType), tokenType, parent);
return Pair.of(
credentials.get(tokenType),
() ->
AuthSession.fromTokenExchange(
client,
tokenRefreshExecutor(),
credentials.get(tokenType),
tokenType,
parent));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ public void testTableAuth(
// if the table returned a bearer token, there will be no token request
if (!tableConfig.containsKey("token")) {
// client credentials or token exchange to get a table token
Mockito.verify(adapter, times(2))
Mockito.verify(adapter, times(1))
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
Expand Down

0 comments on commit 4090a88

Please sign in to comment.