From 7d596a123add35b6deec7c52e44de056fcaba52d Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Fri, 22 May 2026 15:07:52 +0700 Subject: [PATCH] AWS: Isolate S3 remote signer auth and HTTP state per catalog Co-Authored-By: Claude Opus 4.7 (1M context) --- .../aws/s3/signer/TestS3RestSigner.java | 34 ++-- .../aws/s3/signer/S3V4RestSignerClient.java | 49 +++-- .../s3/signer/TestS3V4RestSignerClient.java | 178 ++++++++++++------ 3 files changed, 164 insertions(+), 97 deletions(-) diff --git a/aws/src/integration/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java b/aws/src/integration/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java index 4e5ed3d91870..d1c24b65fbed 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/signer/TestS3RestSigner.java @@ -75,7 +75,6 @@ import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.UploadPartRequest; -import software.amazon.awssdk.utils.IoUtils; @Testcontainers public class TestS3RestSigner { @@ -92,6 +91,7 @@ public class TestS3RestSigner { MinioUtil.createContainer(MinioUtil.LATEST_TAG, CREDENTIALS_PROVIDER.resolveCredentials()); private static Server httpServer; + private static S3V4RestSignerClient signerClient; private static ValidatingSigner validatingSigner; private S3Client s3; @@ -103,19 +103,18 @@ public static void beforeClass() throws Exception { httpServer = initHttpServer(); } - validatingSigner = - new ValidatingSigner( - ImmutableS3V4RestSignerClient.builder() - .properties( - ImmutableMap.of( - RESTCatalogProperties.SIGNER_URI, - httpServer.getURI().toString(), - RESTCatalogProperties.SIGNER_ENDPOINT, - S3SignerServlet.S3_SIGNER_ENDPOINT, - OAuth2Properties.CREDENTIAL, - "catalog:12345")) - .build(), - new CustomAwsS3V4Signer()); + signerClient = + ImmutableS3V4RestSignerClient.builder() + .properties( + ImmutableMap.of( + RESTCatalogProperties.SIGNER_URI, + httpServer.getURI().toString(), + RESTCatalogProperties.SIGNER_ENDPOINT, + S3SignerServlet.S3_SIGNER_ENDPOINT, + OAuth2Properties.CREDENTIAL, + "catalog:12345")) + .build(); + validatingSigner = new ValidatingSigner(signerClient, new CustomAwsS3V4Signer()); } @AfterAll @@ -149,10 +148,9 @@ public static void afterClass() throws Exception { httpServer.stop(); } - IoUtils.closeQuietlyV2(S3V4RestSignerClient.authManager, null); - IoUtils.closeQuietlyV2(S3V4RestSignerClient.httpClient, null); - S3V4RestSignerClient.authManager = null; - S3V4RestSignerClient.httpClient = null; + if (null != signerClient) { + signerClient.close(); + } } @BeforeEach diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java index 7a463abd3d2d..849d2f446d25 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/signer/S3V4RestSignerClient.java @@ -98,13 +98,11 @@ public abstract class S3V4RestSignerClient private static final String SCOPE = "sign"; - @SuppressWarnings({"immutables:incompat", "VisibilityModifier"}) - @VisibleForTesting - static volatile AuthManager authManager; + @SuppressWarnings("immutables:incompat") + private volatile AuthManager authManager; - @SuppressWarnings({"immutables:incompat", "VisibilityModifier"}) - @VisibleForTesting - static volatile RESTClient httpClient; + @SuppressWarnings("immutables:incompat") + private volatile RESTClient httpClient; public abstract Map properties(); @@ -113,6 +111,19 @@ public Supplier> requestPropertiesSupplier() { return Collections::emptyMap; } + /** + * Supplies the {@link RESTClient} used to contact the signer service. Each signer instance owns + * its own client so that per-catalog configuration (custom headers, timeouts, ...) is never + * shared across catalogs. The client is built without a base URI because each request is sent to + * a fully resolved {@link #endpoint()}. Exposed as a settable default so tests can inject a mock + * client. + */ + @Value.Default + Supplier httpClientSupplier() { + return () -> + HTTPClient.builder(properties()).withHeaders(RESTUtil.configHeaders(properties())).build(); + } + @Value.Lazy public String baseSignerUri() { // TODO remove in 1.12.0 @@ -175,11 +186,12 @@ boolean keepTokenRefreshed() { OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT); } - private AuthManager authManager() { + @VisibleForTesting + AuthManager authManager() { if (null == authManager) { - synchronized (S3V4RestSignerClient.class) { + synchronized (this) { if (null == authManager) { - authManager = AuthManagers.loadAuthManager("s3-signer", properties()); + this.authManager = AuthManagers.loadAuthManager("s3-signer", properties()); } } } @@ -187,16 +199,12 @@ private AuthManager authManager() { return authManager; } - private RESTClient httpClient() { + @VisibleForTesting + RESTClient httpClient() { if (null == httpClient) { - synchronized (S3V4RestSignerClient.class) { + synchronized (this) { if (null == httpClient) { - // Don't include a base URI because this client may be used for contacting different - // catalogs. - httpClient = - HTTPClient.builder(properties()) - .withHeaders(RESTUtil.configHeaders(properties())) - .build(); + this.httpClient = httpClientSupplier().get(); } } } @@ -357,7 +365,12 @@ public SdkHttpFullRequest sign( } @Override - public void close() throws Exception {} + public void close() { + IoUtils.closeQuietlyV2(httpClient, null); + IoUtils.closeQuietlyV2(authManager, null); + this.httpClient = null; + this.authManager = null; + } /** * Only add body for DeleteObjectsRequest. Refer to diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3V4RestSignerClient.java b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3V4RestSignerClient.java index aadbf036b567..91ef43d452ce 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3V4RestSignerClient.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/signer/TestS3V4RestSignerClient.java @@ -27,81 +27,38 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.rest.RESTCatalogProperties; import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.auth.AuthManager; import org.apache.iceberg.rest.auth.AuthProperties; import org.apache.iceberg.rest.auth.AuthSession; +import org.apache.iceberg.rest.auth.OAuth2Manager; import org.apache.iceberg.rest.auth.OAuth2Properties; import org.apache.iceberg.rest.auth.OAuth2Util; import org.apache.iceberg.rest.responses.OAuthTokenResponse; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mockito; -import software.amazon.awssdk.utils.IoUtils; class TestS3V4RestSignerClient { - @BeforeAll - static void beforeAll() { - S3V4RestSignerClient.authManager = null; - S3V4RestSignerClient.httpClient = Mockito.mock(RESTClient.class); - when(S3V4RestSignerClient.httpClient.withAuthSession(Mockito.any())) - .thenReturn(S3V4RestSignerClient.httpClient); - when(S3V4RestSignerClient.httpClient.postForm( - Mockito.anyString(), - Mockito.eq( - Map.of( - "grant_type", - "client_credentials", - "client_id", - "user", - "client_secret", - "12345", - "scope", - "sign")), - Mockito.eq(OAuthTokenResponse.class), - Mockito.anyMap(), - Mockito.any())) - .thenReturn( - OAuthTokenResponse.builder().withToken("token").withTokenType("Bearer").build()); - when(S3V4RestSignerClient.httpClient.postForm( - Mockito.anyString(), - Mockito.eq( - Map.of( - "grant_type", - "client_credentials", - "client_id", - "user", - "client_secret", - "12345", - "scope", - "custom")), - Mockito.eq(OAuthTokenResponse.class), - Mockito.anyMap(), - Mockito.any())) - .thenReturn( - OAuthTokenResponse.builder().withToken("token").withTokenType("Bearer").build()); - } - - @AfterAll - static void afterAll() { - S3V4RestSignerClient.httpClient = null; - } - - @AfterEach - void afterEach() { - IoUtils.closeQuietlyV2(S3V4RestSignerClient.authManager, null); - S3V4RestSignerClient.authManager = null; - } + private static final Map SIGNER_PROPERTIES = + Map.of( + RESTCatalogProperties.SIGNER_URI, + "https://signer.com", + RESTCatalogProperties.SIGNER_ENDPOINT, + "v1/sign/s3"); @ParameterizedTest @MethodSource("validOAuth2Properties") - void authSessionOAuth2(Map properties, String expectedScope, String expectedToken) - throws Exception { + void authSessionOAuth2( + Map properties, String expectedScope, String expectedToken) { + RESTClient mockHttpClient = mockHttpClient(); try (S3V4RestSignerClient client = - ImmutableS3V4RestSignerClient.builder().properties(properties).build(); + ImmutableS3V4RestSignerClient.builder() + .properties(properties) + .httpClientSupplier(() -> mockHttpClient) + .build(); AuthSession authSession = client.authSession()) { assertThat(client.optionalOAuthParams()).containsEntry(OAuth2Properties.SCOPE, expectedScope); if (expectedToken == null) { @@ -189,8 +146,7 @@ public static Stream validOAuth2Properties() { @ParameterizedTest @MethodSource("legacySignerProperties") void legacySignerProperties( - Map properties, String expectedBaseSignerUri, String expectedEndpoint) - throws Exception { + Map properties, String expectedBaseSignerUri, String expectedEndpoint) { try (S3V4RestSignerClient client = ImmutableS3V4RestSignerClient.builder().properties(properties).build()) { assertThat(client.baseSignerUri()).isEqualTo(expectedBaseSignerUri); @@ -244,4 +200,104 @@ public static Stream legacySignerProperties() { "https://catalog.com", "https://catalog.com/" + S3V4RestSignerClient.S3_SIGNER_DEFAULT_ENDPOINT)); } + + @Test + void httpClientNotSharedAcrossInstances() { + RESTClient firstHttpClient = Mockito.mock(RESTClient.class); + RESTClient secondHttpClient = Mockito.mock(RESTClient.class); + try (S3V4RestSignerClient first = + ImmutableS3V4RestSignerClient.builder() + .properties(SIGNER_PROPERTIES) + .httpClientSupplier(() -> firstHttpClient) + .build(); + S3V4RestSignerClient second = + ImmutableS3V4RestSignerClient.builder() + .properties(SIGNER_PROPERTIES) + .httpClientSupplier(() -> secondHttpClient) + .build()) { + // each signer must use its own client, not the first one initialized in the JVM + assertThat(first.httpClient()).isSameAs(firstHttpClient); + assertThat(second.httpClient()).isSameAs(secondHttpClient); + assertThat(first.httpClient()).isNotSameAs(second.httpClient()); + } + } + + @Test + void authManagerNotSharedAcrossInstances() { + Map oauth2Properties = + Map.of( + RESTCatalogProperties.SIGNER_URI, + "https://signer.com", + RESTCatalogProperties.SIGNER_ENDPOINT, + "v1/sign/s3", + AuthProperties.AUTH_TYPE, + AuthProperties.AUTH_TYPE_OAUTH2, + OAuth2Properties.TOKEN, + "token"); + try (S3V4RestSignerClient noAuth = + ImmutableS3V4RestSignerClient.builder().properties(SIGNER_PROPERTIES).build(); + S3V4RestSignerClient oauth2 = + ImmutableS3V4RestSignerClient.builder().properties(oauth2Properties).build()) { + AuthManager noAuthManager = noAuth.authManager(); + AuthManager oauth2Manager = oauth2.authManager(); + // a catalog configured for OAuth2 must not leak its auth manager into a catalog that + // configured no auth (and vice versa) + assertThat(noAuthManager).isNotSameAs(oauth2Manager); + assertThat(oauth2Manager).isInstanceOf(OAuth2Manager.class); + assertThat(noAuthManager).isNotInstanceOf(OAuth2Manager.class); + } + } + + @Test + void closeReleasesHttpClient() throws Exception { + RESTClient mockHttpClient = Mockito.mock(RESTClient.class); + S3V4RestSignerClient client = + ImmutableS3V4RestSignerClient.builder() + .properties(SIGNER_PROPERTIES) + .httpClientSupplier(() -> mockHttpClient) + .build(); + assertThat(client.httpClient()).isSameAs(mockHttpClient); + client.close(); + Mockito.verify(mockHttpClient).close(); + } + + private static RESTClient mockHttpClient() { + RESTClient mock = Mockito.mock(RESTClient.class); + when(mock.withAuthSession(Mockito.any())).thenReturn(mock); + when(mock.postForm( + Mockito.anyString(), + Mockito.eq( + Map.of( + "grant_type", + "client_credentials", + "client_id", + "user", + "client_secret", + "12345", + "scope", + "sign")), + Mockito.eq(OAuthTokenResponse.class), + Mockito.anyMap(), + Mockito.any())) + .thenReturn( + OAuthTokenResponse.builder().withToken("token").withTokenType("Bearer").build()); + when(mock.postForm( + Mockito.anyString(), + Mockito.eq( + Map.of( + "grant_type", + "client_credentials", + "client_id", + "user", + "client_secret", + "12345", + "scope", + "custom")), + Mockito.eq(OAuthTokenResponse.class), + Mockito.anyMap(), + Mockito.any())) + .thenReturn( + OAuthTokenResponse.builder().withToken("token").withTokenType("Bearer").build()); + return mock; + } }