Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.utils.IoUtils;

@Testcontainers
public class TestS3RestSigner {
Expand All @@ -92,6 +91,7 @@ public class TestS3RestSigner {
MinioUtil.createContainer(MinioUtil.LATEST_TAG, CREDENTIALS_PROVIDER.resolveCredentials());

private static Server httpServer;
private static S3V4RestSignerClient signerClient;
private static ValidatingSigner validatingSigner;
private S3Client s3;

Expand All @@ -103,19 +103,18 @@ public static void beforeClass() throws Exception {
httpServer = initHttpServer();
}

validatingSigner =
new ValidatingSigner(
ImmutableS3V4RestSignerClient.builder()
.properties(
ImmutableMap.of(
RESTCatalogProperties.SIGNER_URI,
httpServer.getURI().toString(),
RESTCatalogProperties.SIGNER_ENDPOINT,
S3SignerServlet.S3_SIGNER_ENDPOINT,
OAuth2Properties.CREDENTIAL,
"catalog:12345"))
.build(),
new CustomAwsS3V4Signer());
signerClient =
ImmutableS3V4RestSignerClient.builder()
.properties(
ImmutableMap.of(
RESTCatalogProperties.SIGNER_URI,
httpServer.getURI().toString(),
RESTCatalogProperties.SIGNER_ENDPOINT,
S3SignerServlet.S3_SIGNER_ENDPOINT,
OAuth2Properties.CREDENTIAL,
"catalog:12345"))
.build();
validatingSigner = new ValidatingSigner(signerClient, new CustomAwsS3V4Signer());
}

@AfterAll
Expand Down Expand Up @@ -149,10 +148,9 @@ public static void afterClass() throws Exception {
httpServer.stop();
}

IoUtils.closeQuietlyV2(S3V4RestSignerClient.authManager, null);
IoUtils.closeQuietlyV2(S3V4RestSignerClient.httpClient, null);
S3V4RestSignerClient.authManager = null;
S3V4RestSignerClient.httpClient = null;
if (null != signerClient) {
signerClient.close();
}
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,11 @@ public abstract class S3V4RestSignerClient

private static final String SCOPE = "sign";

@SuppressWarnings({"immutables:incompat", "VisibilityModifier"})
@VisibleForTesting
static volatile AuthManager authManager;
@SuppressWarnings("immutables:incompat")
private volatile AuthManager authManager;

@SuppressWarnings({"immutables:incompat", "VisibilityModifier"})
@VisibleForTesting
static volatile RESTClient httpClient;
@SuppressWarnings("immutables:incompat")
private volatile RESTClient httpClient;

public abstract Map<String, String> properties();

Expand All @@ -113,6 +111,19 @@ public Supplier<Map<String, String>> requestPropertiesSupplier() {
return Collections::emptyMap;
}

/**
* Supplies the {@link RESTClient} used to contact the signer service. Each signer instance owns
* its own client so that per-catalog configuration (custom headers, timeouts, ...) is never
* shared across catalogs. The client is built without a base URI because each request is sent to
* a fully resolved {@link #endpoint()}. Exposed as a settable default so tests can inject a mock
* client.
*/
@Value.Default
Supplier<RESTClient> httpClientSupplier() {
return () ->
HTTPClient.builder(properties()).withHeaders(RESTUtil.configHeaders(properties())).build();
}

@Value.Lazy
public String baseSignerUri() {
// TODO remove in 1.12.0
Expand Down Expand Up @@ -175,28 +186,25 @@ boolean keepTokenRefreshed() {
OAuth2Properties.TOKEN_REFRESH_ENABLED_DEFAULT);
}

private AuthManager authManager() {
@VisibleForTesting
AuthManager authManager() {
if (null == authManager) {
synchronized (S3V4RestSignerClient.class) {
synchronized (this) {
if (null == authManager) {
authManager = AuthManagers.loadAuthManager("s3-signer", properties());
this.authManager = AuthManagers.loadAuthManager("s3-signer", properties());
}
}
}

return authManager;
}

private RESTClient httpClient() {
@VisibleForTesting
RESTClient httpClient() {
if (null == httpClient) {
synchronized (S3V4RestSignerClient.class) {
synchronized (this) {
if (null == httpClient) {
// Don't include a base URI because this client may be used for contacting different
// catalogs.
httpClient =
HTTPClient.builder(properties())
.withHeaders(RESTUtil.configHeaders(properties()))
.build();
this.httpClient = httpClientSupplier().get();
}
}
}
Expand Down Expand Up @@ -357,7 +365,12 @@ public SdkHttpFullRequest sign(
}

@Override
public void close() throws Exception {}
public void close() {
IoUtils.closeQuietlyV2(httpClient, null);
IoUtils.closeQuietlyV2(authManager, null);
this.httpClient = null;
this.authManager = null;
}

/**
* Only add body for DeleteObjectsRequest. Refer to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,81 +27,38 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.rest.RESTCatalogProperties;
import org.apache.iceberg.rest.RESTClient;
import org.apache.iceberg.rest.auth.AuthManager;
import org.apache.iceberg.rest.auth.AuthProperties;
import org.apache.iceberg.rest.auth.AuthSession;
import org.apache.iceberg.rest.auth.OAuth2Manager;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import software.amazon.awssdk.utils.IoUtils;

class TestS3V4RestSignerClient {

@BeforeAll
static void beforeAll() {
S3V4RestSignerClient.authManager = null;
S3V4RestSignerClient.httpClient = Mockito.mock(RESTClient.class);
when(S3V4RestSignerClient.httpClient.withAuthSession(Mockito.any()))
.thenReturn(S3V4RestSignerClient.httpClient);
when(S3V4RestSignerClient.httpClient.postForm(
Mockito.anyString(),
Mockito.eq(
Map.of(
"grant_type",
"client_credentials",
"client_id",
"user",
"client_secret",
"12345",
"scope",
"sign")),
Mockito.eq(OAuthTokenResponse.class),
Mockito.anyMap(),
Mockito.any()))
.thenReturn(
OAuthTokenResponse.builder().withToken("token").withTokenType("Bearer").build());
when(S3V4RestSignerClient.httpClient.postForm(
Mockito.anyString(),
Mockito.eq(
Map.of(
"grant_type",
"client_credentials",
"client_id",
"user",
"client_secret",
"12345",
"scope",
"custom")),
Mockito.eq(OAuthTokenResponse.class),
Mockito.anyMap(),
Mockito.any()))
.thenReturn(
OAuthTokenResponse.builder().withToken("token").withTokenType("Bearer").build());
}

@AfterAll
static void afterAll() {
S3V4RestSignerClient.httpClient = null;
}

@AfterEach
void afterEach() {
IoUtils.closeQuietlyV2(S3V4RestSignerClient.authManager, null);
S3V4RestSignerClient.authManager = null;
}
private static final Map<String, String> SIGNER_PROPERTIES =
Map.of(
RESTCatalogProperties.SIGNER_URI,
"https://signer.com",
RESTCatalogProperties.SIGNER_ENDPOINT,
"v1/sign/s3");

@ParameterizedTest
@MethodSource("validOAuth2Properties")
void authSessionOAuth2(Map<String, String> properties, String expectedScope, String expectedToken)
throws Exception {
void authSessionOAuth2(
Map<String, String> properties, String expectedScope, String expectedToken) {
RESTClient mockHttpClient = mockHttpClient();
try (S3V4RestSignerClient client =
ImmutableS3V4RestSignerClient.builder().properties(properties).build();
ImmutableS3V4RestSignerClient.builder()
.properties(properties)
.httpClientSupplier(() -> mockHttpClient)
.build();
AuthSession authSession = client.authSession()) {
assertThat(client.optionalOAuthParams()).containsEntry(OAuth2Properties.SCOPE, expectedScope);
if (expectedToken == null) {
Expand Down Expand Up @@ -189,8 +146,7 @@ public static Stream<Arguments> validOAuth2Properties() {
@ParameterizedTest
@MethodSource("legacySignerProperties")
void legacySignerProperties(
Map<String, String> properties, String expectedBaseSignerUri, String expectedEndpoint)
throws Exception {
Map<String, String> properties, String expectedBaseSignerUri, String expectedEndpoint) {
try (S3V4RestSignerClient client =
ImmutableS3V4RestSignerClient.builder().properties(properties).build()) {
assertThat(client.baseSignerUri()).isEqualTo(expectedBaseSignerUri);
Expand Down Expand Up @@ -244,4 +200,104 @@ public static Stream<Arguments> legacySignerProperties() {
"https://catalog.com",
"https://catalog.com/" + S3V4RestSignerClient.S3_SIGNER_DEFAULT_ENDPOINT));
}

@Test
void httpClientNotSharedAcrossInstances() {
RESTClient firstHttpClient = Mockito.mock(RESTClient.class);
RESTClient secondHttpClient = Mockito.mock(RESTClient.class);
try (S3V4RestSignerClient first =
ImmutableS3V4RestSignerClient.builder()
.properties(SIGNER_PROPERTIES)
.httpClientSupplier(() -> firstHttpClient)
.build();
S3V4RestSignerClient second =
ImmutableS3V4RestSignerClient.builder()
.properties(SIGNER_PROPERTIES)
.httpClientSupplier(() -> secondHttpClient)
.build()) {
// each signer must use its own client, not the first one initialized in the JVM
assertThat(first.httpClient()).isSameAs(firstHttpClient);
assertThat(second.httpClient()).isSameAs(secondHttpClient);
assertThat(first.httpClient()).isNotSameAs(second.httpClient());
}
}

@Test
void authManagerNotSharedAcrossInstances() {
Map<String, String> oauth2Properties =
Map.of(
RESTCatalogProperties.SIGNER_URI,
"https://signer.com",
RESTCatalogProperties.SIGNER_ENDPOINT,
"v1/sign/s3",
AuthProperties.AUTH_TYPE,
AuthProperties.AUTH_TYPE_OAUTH2,
OAuth2Properties.TOKEN,
"token");
try (S3V4RestSignerClient noAuth =
ImmutableS3V4RestSignerClient.builder().properties(SIGNER_PROPERTIES).build();
S3V4RestSignerClient oauth2 =
ImmutableS3V4RestSignerClient.builder().properties(oauth2Properties).build()) {
AuthManager noAuthManager = noAuth.authManager();
AuthManager oauth2Manager = oauth2.authManager();
// a catalog configured for OAuth2 must not leak its auth manager into a catalog that
// configured no auth (and vice versa)
assertThat(noAuthManager).isNotSameAs(oauth2Manager);
assertThat(oauth2Manager).isInstanceOf(OAuth2Manager.class);
assertThat(noAuthManager).isNotInstanceOf(OAuth2Manager.class);
}
}

@Test
void closeReleasesHttpClient() throws Exception {
RESTClient mockHttpClient = Mockito.mock(RESTClient.class);
S3V4RestSignerClient client =
ImmutableS3V4RestSignerClient.builder()
.properties(SIGNER_PROPERTIES)
.httpClientSupplier(() -> mockHttpClient)
.build();
assertThat(client.httpClient()).isSameAs(mockHttpClient);
client.close();
Mockito.verify(mockHttpClient).close();
}

private static RESTClient mockHttpClient() {
RESTClient mock = Mockito.mock(RESTClient.class);
when(mock.withAuthSession(Mockito.any())).thenReturn(mock);
when(mock.postForm(
Mockito.anyString(),
Mockito.eq(
Map.of(
"grant_type",
"client_credentials",
"client_id",
"user",
"client_secret",
"12345",
"scope",
"sign")),
Mockito.eq(OAuthTokenResponse.class),
Mockito.anyMap(),
Mockito.any()))
.thenReturn(
OAuthTokenResponse.builder().withToken("token").withTokenType("Bearer").build());
when(mock.postForm(
Mockito.anyString(),
Mockito.eq(
Map.of(
"grant_type",
"client_credentials",
"client_id",
"user",
"client_secret",
"12345",
"scope",
"custom")),
Mockito.eq(OAuthTokenResponse.class),
Mockito.anyMap(),
Mockito.any()))
.thenReturn(
OAuthTokenResponse.builder().withToken("token").withTokenType("Bearer").build());
return mock;
}
}