diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 409615d624ed..bf424f9a1acb 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -60,6 +60,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.FileIOTracker; import org.apache.iceberg.io.StorageCredential; +import org.apache.iceberg.io.SupportsStorageCredentials; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.metrics.MetricsReporters; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -1175,7 +1176,15 @@ private FileIO newFileIO(SessionContext context, Map properties) private FileIO newFileIO( SessionContext context, Map properties, List storageCredentials) { if (null != ioBuilder) { - return ioBuilder.apply(context, properties); + FileIO fileIO = ioBuilder.apply(context, properties); + if (!storageCredentials.isEmpty() + && fileIO instanceof SupportsStorageCredentials ioWithCredentials) { + ioWithCredentials.setCredentials( + storageCredentials.stream() + .map(c -> StorageCredential.create(c.prefix(), c.config())) + .collect(Collectors.toList())); + } + return fileIO; } else { String ioImpl = properties.getOrDefault(CatalogProperties.FILE_IO_IMPL, DEFAULT_FILE_IO_IMPL); return CatalogUtil.loadFileIO( diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index d07efdd3a21d..06e08297fd43 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -37,6 +37,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.file.Path; @@ -67,6 +68,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TestCatalogUtil; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; @@ -86,6 +88,8 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.inmemory.InMemoryCatalog; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.StorageCredential; +import org.apache.iceberg.io.SupportsStorageCredentials; import org.apache.iceberg.metrics.CommitReport; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -99,6 +103,8 @@ import org.apache.iceberg.rest.auth.AuthSessionUtil; import org.apache.iceberg.rest.auth.OAuth2Properties; import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.rest.credentials.Credential; +import org.apache.iceberg.rest.credentials.ImmutableCredential; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.UpdateTableRequest; @@ -3740,6 +3746,71 @@ public void testSequenceNumberConflictThrowsCommitFailed() { .hasMessageContaining("Validation failed, please retry"); } + @Test + public void testIoBuilderReceivesStorageCredentials() { + Credential credential = + ImmutableCredential.builder() + .prefix("s3://test-bucket/") + .putConfig("s3.access-key-id", "test-access-key") + .putConfig("s3.secret-access-key", "test-secret-key") + .build(); + + // Adapter that injects storage credentials into LoadTableResponse + RESTCatalogAdapter adapter = + new RESTCatalogAdapter(backendCatalog) { + @SuppressWarnings("unchecked") + @Override + public T handleRequest( + Route route, + Map vars, + HTTPRequest httpRequest, + Class responseType, + Consumer> responseHeaders) { + T response = + super.handleRequest(route, vars, httpRequest, responseType, responseHeaders); + if (route == Route.LOAD_TABLE && response instanceof LoadTableResponse loadResponse) { + return (T) + LoadTableResponse.builder() + .withTableMetadata(loadResponse.tableMetadata()) + .addAllConfig(loadResponse.config()) + .addCredential(credential) + .build(); + } + return response; + } + }; + + AtomicReference createdFileIO = new AtomicReference<>(); + + try (RESTCatalog catalog = + catalog( + adapter, + clientBuilder -> + new RESTSessionCatalog( + clientBuilder, + (context, config) -> { + TestCatalogUtil.TestFileIOWithStorageCredentials fileIO = + new TestCatalogUtil.TestFileIOWithStorageCredentials(); + createdFileIO.set(fileIO); + return fileIO; + }))) { + catalog.createNamespace(NS); + catalog.createTable(TABLE, SCHEMA); + catalog.loadTable(TABLE); + + assertThat(createdFileIO.get()).isInstanceOf(SupportsStorageCredentials.class); + List creds = + ((SupportsStorageCredentials) createdFileIO.get()).credentials(); + assertThat(creds).hasSize(1); + assertThat(creds.get(0).prefix()).isEqualTo("s3://test-bucket/"); + assertThat(creds.get(0).config()) + .containsEntry("s3.access-key-id", "test-access-key") + .containsEntry("s3.secret-access-key", "test-secret-key"); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private RESTCatalog catalog(RESTCatalogAdapter adapter) { RESTCatalog catalog = new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);