diff --git a/core/src/main/java/kafka/automq/table/CatalogFactory.java b/core/src/main/java/kafka/automq/table/CatalogFactory.java index 29d5820c99..3706cc3370 100644 --- a/core/src/main/java/kafka/automq/table/CatalogFactory.java +++ b/core/src/main/java/kafka/automq/table/CatalogFactory.java @@ -217,15 +217,16 @@ private Catalog runAs(Supplier func) { } } + // important: use putIfAbsent to let the user override all values directly in catalog configuration private void putDataBucketAsWarehouse(boolean s3a) { - if (bucketURI.endpoint() != null) { - options.put("s3.endpoint", bucketURI.endpoint()); + if (StringUtils.isNotBlank(bucketURI.endpoint())) { + options.putIfAbsent("s3.endpoint", bucketURI.endpoint()); } if (bucketURI.extensionBool(AwsObjectStorage.PATH_STYLE_KEY, false)) { - options.put("s3.path-style-access", "true"); + options.putIfAbsent("s3.path-style-access", "true"); } - options.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); - options.put("warehouse", String.format((s3a ? "s3a" : "s3") + "://%s/iceberg", bucketURI.bucket())); + options.putIfAbsent("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + options.putIfAbsent("warehouse", String.format((s3a ? "s3a" : "s3") + "://%s/iceberg", bucketURI.bucket())); } } diff --git a/core/src/test/java/kafka/automq/table/CatalogFactoryTest.java b/core/src/test/java/kafka/automq/table/CatalogFactoryTest.java index 0d134a4128..9ef3e531b0 100644 --- a/core/src/test/java/kafka/automq/table/CatalogFactoryTest.java +++ b/core/src/test/java/kafka/automq/table/CatalogFactoryTest.java @@ -7,7 +7,10 @@ import com.sun.net.httpserver.HttpServer; +import org.apache.iceberg.aws.s3.S3FileIOProperties; +import org.apache.iceberg.inmemory.InMemoryFileIO; import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.util.SerializableMap; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -22,6 +25,7 @@ import static java.util.stream.Collectors.toMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; @TestInstance(PER_CLASS) @@ -36,51 +40,36 @@ class CatalogFactoryTest { @Test void restPassthroughProperties() throws IOException { - final var catalogBackend = HttpServer.create(new InetSocketAddress("localhost", 0), 16); - final var requests = new CopyOnWriteArrayList(); // normally overkill but makes the test more accurate - catalogBackend.createContext("/").setHandler(ex -> { - try (ex) { - final var method = ex.getRequestMethod(); - requests.add( - method + ' ' + ex.getRequestURI().getPath() + '?' + ex.getRequestURI().getQuery() + - ('\n' + String.join("", ex.getRequestHeaders().getOrDefault("x-custom", List.of()))) + - ('\n' + new String(ex.getRequestBody().readAllBytes(), UTF_8)).strip()); - - if (method.equals("GET") && - ex.getRequestURI().getPath().equals("/v1/config") && - "warehouse=s3%3A%2F%2Fmy_bucket%2Ficeberg".equals(ex.getRequestURI().getRawQuery())) { - final var body = """ - { - "defaults": {}, - "overrides": {} - } - """.getBytes(UTF_8); - ex.getResponseHeaders().add("content-type", "application/json"); - ex.sendResponseHeaders(200, body.length); - ex.getResponseBody().write(body); - return; - } - - // else we just called an unexpected endpoint, issue a HTTP 404 - ex.sendResponseHeaders(404, 0); - } - }); - catalogBackend.start(); - try { - + final var restCatalog = new RestCatalogMock(); + try (final var autoClose = restCatalog) { final var config = new KafkaConfig(merge(requiredKafkaConfigProperties, Map.of( "automq.table.topic.catalog.type", "rest", - "automq.table.topic.catalog.uri", "http://localhost:" + catalogBackend.getAddress().getPort(), + "automq.table.topic.catalog.uri", restCatalog.base(), "automq.table.topic.catalog.header.x-custom", "my-x", // Apache Polaris needs a tenant header for ex - "automq.table.topic.catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO", // automq specific/enforced (not standard catalog passthrough) "s3.data.buckets", "0@s3://my_bucket?region=us-east-1&endpoint=http://localhost:12345&pathStyle=true" ))); final var catalog = new CatalogFactory.Builder(config).build(); assertInstanceOf(RESTCatalog.class, catalog).close(); - assertEquals(List.of("GET /v1/config?warehouse=s3://my_bucket/iceberg\nmy-x"), requests); + } + assertEquals(List.of("GET /v1/config?warehouse=s3://my_bucket/iceberg\nmy-x"), restCatalog.requests()); + } + + @Test + void ignoreEmptyS3EndpointForRestCatalog() throws IOException { + FakeS3IO.lastS3FileIOProperties = null; + try (final var restCatalog = new RestCatalogMock()) { + final var config = new KafkaConfig(merge(requiredKafkaConfigProperties, Map.of( + "automq.table.topic.catalog.type", "rest", + "automq.table.topic.catalog.uri", restCatalog.base(), + "automq.table.topic.catalog.io-impl", FakeS3IO.class.getName(), + "s3.data.buckets", "0@s3://my_bucket?region=us-east-1" + ))); + final var catalog = new CatalogFactory.Builder(config).build(); + assertInstanceOf(RESTCatalog.class, catalog).close(); + assertNull(FakeS3IO.lastS3FileIOProperties().endpoint(), "S3FileIO endpoint should be null when not set - not even empty"); } finally { - catalogBackend.stop(0); + FakeS3IO.lastS3FileIOProperties = null; } } @@ -90,4 +79,68 @@ private Map merge(final Map... all) { .flatMap(it -> it.entrySet().stream()) .collect(toMap(Map.Entry::getKey, Map.Entry::getValue, (a, b) -> b)); } + + public static class FakeS3IO extends InMemoryFileIO { + private static S3FileIOProperties lastS3FileIOProperties; + + private static S3FileIOProperties lastS3FileIOProperties() { + return lastS3FileIOProperties; + } + + @Override + public void initialize(final Map properties) { + lastS3FileIOProperties = new S3FileIOProperties(SerializableMap.copyOf(properties)); + super.initialize(properties); + } + } + + private static class RestCatalogMock implements AutoCloseable { + private final List requests = new CopyOnWriteArrayList<>(); // normally overkill but makes the test more accurate + private final HttpServer catalogBackend; + + private RestCatalogMock() throws IOException { + catalogBackend = HttpServer.create(new InetSocketAddress("localhost", 0), 16); + catalogBackend.createContext("/").setHandler(ex -> { + try (ex) { + final var method = ex.getRequestMethod(); + requests.add( + method + ' ' + ex.getRequestURI().getPath() + '?' + ex.getRequestURI().getQuery() + + ('\n' + String.join("", ex.getRequestHeaders().getOrDefault("x-custom", List.of()))) + + ('\n' + new String(ex.getRequestBody().readAllBytes(), UTF_8)).strip()); + + if (method.equals("GET") && + ex.getRequestURI().getPath().equals("/v1/config") && + "warehouse=s3%3A%2F%2Fmy_bucket%2Ficeberg".equals(ex.getRequestURI().getRawQuery())) { + final var body = """ + { + "defaults": {}, + "overrides": {} + } + """.getBytes(UTF_8); + ex.getResponseHeaders().add("content-type", "application/json"); + ex.sendResponseHeaders(200, body.length); + ex.getResponseBody().write(body); + return; + } + + // else we just called an unexpected endpoint, issue a HTTP 404 + ex.sendResponseHeaders(404, 0); + } + }); + catalogBackend.start(); + } + + private String base() { + return "http://localhost:" + catalogBackend.getAddress().getPort(); + } + + private List requests() { + return requests; + } + + @Override + public void close() { + catalogBackend.stop(0); + } + } }