Core: Add session catalog implementation for REST#4830
Conversation
4e48a2d to
b02d046
Compare
|
Looks like the file names are misleading git. |
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Outdated
Show resolved
Hide resolved
|
|
||
| public AuthSession refresh() { | ||
| Map<String, String> request = OAuth2Util.tokenExchangeRequest( | ||
| token, tokenType, ImmutableList.of(RESTCatalogProperties.CATALOG_SCOPE)); |
There was a problem hiding this comment.
The token exchange request uses the current token in the Authorization header and as the subject_token to refresh.
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Outdated
Show resolved
Hide resolved
|
Overall, the logic and sequencing looks sound. The one thing I might suggest is actually creating a separate package |
| import org.apache.iceberg.rest.OAuth2Util; | ||
| import org.apache.iceberg.rest.RESTResponse; | ||
|
|
||
| public class OAuthTokenResponse implements RESTResponse { |
There was a problem hiding this comment.
It looked like this is deserialized using ObjectMapper.readValue() but I didn't see any setting for handling snake case field names, so just wanted to confirm that works...
| String credential = props.get(RESTCatalogProperties.CREDENTIAL); | ||
| if (credential != null && !credential.isEmpty()) { | ||
| String scope = props.getOrDefault(RESTCatalogProperties.SCOPE, RESTCatalogProperties.CATALOG_SCOPE); | ||
| authResponse = fetchClientCredentials(initClient, initHeaders, credential, scope); |
There was a problem hiding this comment.
It might be nice to have the option for a different URL base for auth requests, to support an authorization server separate from the catalog server
5df7ed4 to
35b1107
Compare
2f50a55 to
0ec9bdc
Compare
0ec9bdc to
cbda0a5
Compare
bfd0079 to
1c11698
Compare
1c11698 to
105b70f
Compare
| '-Xep:StrictUnusedVariable:OFF', | ||
| '-Xep:TypeParameterShadowing:OFF', | ||
| '-Xep:TypeParameterUnusedInFormals:OFF', | ||
| '-Xep:DangerousThreadPoolExecutorUsage:OFF', |
There was a problem hiding this comment.
This was needed to add ThreadPools.newScheduledPool. For some reason, errorprone doesn't like it when you create a ScheduledExecutorService.
| // convert expiration interval to milliseconds | ||
| long expiresInMillis = unit.toMillis(expiresIn); | ||
| // how much ahead of time to start the request to allow it to complete | ||
| long refreshWindowMillis = Math.min(expiresInMillis / 10, MAX_REFRESH_WINDOW_MILLIS); |
There was a problem hiding this comment.
The logic here is to start the refresh with 10% of the retry window left, but no more than some maximum (in case expires-in is long, like 10 hours).
| if (authResponse != null) { | ||
| this.catalogAuth = newSession(authResponse, startTimeMillis, catalogAuth); | ||
| } else if (initToken != null) { | ||
| this.catalogAuth = newSession(initToken, expiresInMs(mergedProps), catalogAuth); |
There was a problem hiding this comment.
This feels a little awkward because if there's a token initially, the init doesn't go through the token exchange flow, so we don't actually know when the token is supposed to expire but still scheduling to refresh. It feels like we should either exchange immediately and use the expires response to schedule the interval or assume that a set token has no expiration.
There was a problem hiding this comment.
This is the case where you specify an bearer token directly using "token". In that case, we use a catalog property to determine when to refresh that bearer token. This is the path for a scheduler to get an access token and pass it into a job. It can also pass the expiration interval, or just rely on the default.
Right now, we don't support an initial token exchange. We could go through the same logic as a new session or table load, where we look for different token-exchange types and exchange for an access token. The only trouble there is that I though it would be weird to take the token and exchange it right away. It seems like you should be using client credentials to bootstrap.
|
Thanks for the reviews, @kbendick and @danielcweeks! |
This adds
RESTSessionCatalogthat implementsSessionCatalog. This is the last PR to replace #4575.RESTSessionCatalogimplementsSessionCatalogand adds support for session-based auth handling using OAuth2.For the catalog:
AuthorizationheaderAuthorizationheader.Session context is updated to have a map of credentials. For each session context:
"token"credential), that token is used as an access token in theAuthorizationheader"credential"credential), those credentials are used to fetch a bearer token using OAuth2 client credentials flow, and that token is used for theAuthorizationheaderAuthorizationheader. This exchange uses the catalog's token, if present, as theactor_token.When a table is loaded:
AuthorizationheaderAuthorizationheader.Authorizationheader. This exchange uses the context token or catalog token asactor_token.A scheduled thread pool is used to refresh tokens using the token exchange flow with no actor token. The token exchange uses the token that is being refreshed in the
Authorizationheader.