diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java b/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java index 777cc5523..cd18041e3 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java @@ -1,7 +1,10 @@ package io.whitefox.api.deltasharing; import static io.whitefox.DeltaTestUtils.*; +import static io.whitefox.IcebergTestUtils.icebergTableWithHadoopCatalog; +import static io.whitefox.IcebergTestUtils.s3IcebergTableWithAwsGlueCatalog; +import io.whitefox.AwsGlueTestConfig; import io.whitefox.S3TestConfig; import io.whitefox.api.deltasharing.model.FileObjectFileWithoutPresignedUrl; import io.whitefox.api.deltasharing.model.FileObjectWithoutPresignedUrl; @@ -27,8 +30,17 @@ public static InternalTable s3DeltaTableWithHistory1(S3TestConfig s3TestConfig) return s3DeltaTable("delta-table-with-history", s3TestConfig); } + public static InternalTable s3IcebergTable1( + S3TestConfig s3TestConfig, AwsGlueTestConfig awsGlueTestConfig) { + return s3IcebergTableWithAwsGlueCatalog( + s3TestConfig, awsGlueTestConfig, "test_glue_db", "icebergtable1"); + } + public static final InternalTable deltaTable1 = deltaTable("delta-table"); + public static final InternalTable icebergtable1 = + icebergTableWithHadoopCatalog("test_db", "icebergtable1"); + public static final String deltaTable1Path = deltaTableUri("delta-table"); public static final String deltaTableWithHistory1Path = deltaTableUri("delta-table-with-history"); @@ -46,7 +58,8 @@ public static StorageManager createStorageManager() { List.of( new SharedTable("table1", "default", "name", deltaTable1), new SharedTable( - "table-with-history", "default", "name", deltaTableWithHistory1)), + "table-with-history", "default", "name", deltaTableWithHistory1), + new SharedTable("icebergtable1", "default", "name", icebergtable1)), "name")), testPrincipal, 0L))); diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java index 01dbfdc04..22d11d591 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java @@ -9,6 +9,7 @@ import io.quarkus.test.junit.QuarkusMock; import io.quarkus.test.junit.QuarkusTest; import io.restassured.http.Header; +import io.whitefox.AwsGlueTestConfig; import io.whitefox.S3TestConfig; import io.whitefox.api.OpenApiValidatorUtils; import io.whitefox.api.deltasharing.SampleTables; @@ -51,14 +52,18 @@ public static void setup() { private final S3TestConfig s3TestConfig; + private final AwsGlueTestConfig awsGlueTestConfig; + @Inject - public DeltaSharesApiImplAwsTest(ObjectMapper objectMapper, S3TestConfig s3TestConfig) { + public DeltaSharesApiImplAwsTest( + ObjectMapper objectMapper, S3TestConfig s3TestConfig, AwsGlueTestConfig awsGlueTestConfig) { this.objectMapper = objectMapper; this.s3TestConfig = s3TestConfig; + this.awsGlueTestConfig = awsGlueTestConfig; } @BeforeEach - public void updateStorageManagerWithS3DeltaTables() { + public void updateStorageManagerWithS3Tables() { storageManager.createShare(new Share( "s3share", "key", @@ -72,12 +77,51 @@ public void updateStorageManagerWithS3DeltaTables() { "s3table-with-history", "s3schema", "s3share", - s3DeltaTableWithHistory1(s3TestConfig))), + s3DeltaTableWithHistory1(s3TestConfig)), + new SharedTable( + "s3IcebergTable1", + "s3schema", + "s3share", + s3IcebergTable1(s3TestConfig, awsGlueTestConfig))), "s3share")), new Principal("Mr fox"), 0L)); } + @Test + @DisabledOnOs(OS.WINDOWS) + public void icebergTableMetadata() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .get( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/metadata", + "s3share", + "s3schema", + "s3IcebergTable1") + .then() + .statusCode(200) + .extract() + .asString() + .split("\n"); + assertEquals(2, responseBodyLines.length); + assertEquals( + new ProtocolObject().protocol(new ProtocolObjectProtocol().minReaderVersion(1)), + objectMapper.reader().readValue(responseBodyLines[0], ProtocolObject.class)); + assertEquals( + new MetadataObject() + .metaData(new MetadataObjectMetaData() + .id("7819530050735196523") + .name("metastore.test_glue_db.icebergtable1") + .format(new FormatObject().provider("parquet")) + .schemaString( + "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}}]}") + .partitionColumns(List.of()) + .version(1L) + ._configuration(Map.of("write.parquet.compression-codec", "zstd"))), + objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class)); + } + @DisabledOnOs(OS.WINDOWS) @Test public void queryTableCurrentVersion() throws IOException { diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java index 6a89f0f32..389da74bf 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java @@ -149,8 +149,10 @@ public void listTables() { .get("delta-api/v1/shares/{share}/schemas/{schema}/tables", "name", "default") .then() .statusCode(200) - .body("items", hasSize(2)) - .body("items[0].name", either(is("table1")).or(is("table-with-history"))) + .body("items", hasSize(3)) + .body( + "items[0].name", + either(is("table1")).or(is("table-with-history")).or(is("icebergtable1"))) .body("items[0].schema", is("default")) .body("items[0].share", is("name")) .body("nextPageToken", is(nullValue())); @@ -172,7 +174,7 @@ public void tableMetadataNotFound() { @Test @DisabledOnOs(OS.WINDOWS) - public void tableMetadata() throws IOException { + public void deltaTableMetadata() throws IOException { var responseBodyLines = given() .when() .filter(deltaFilter) @@ -204,6 +206,40 @@ public void tableMetadata() throws IOException { objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class)); } + @Test + @DisabledOnOs(OS.WINDOWS) + public void icebergTableMetadata() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .get( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/metadata", + "name", + "default", + "icebergtable1") + .then() + .statusCode(200) + .extract() + .asString() + .split("\n"); + assertEquals(2, responseBodyLines.length); + assertEquals( + new ProtocolObject().protocol(new ProtocolObjectProtocol().minReaderVersion(1)), + objectMapper.reader().readValue(responseBodyLines[0], ProtocolObject.class)); + assertEquals( + new MetadataObject() + .metaData(new MetadataObjectMetaData() + .id("3369848726892806393") + .name("metastore.test_db.icebergtable1") + .format(new FormatObject().provider("parquet")) + .schemaString( + "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}}]}") + .partitionColumns(List.of()) + .version(1L) + ._configuration(Map.of("write.parquet.compression-codec", "zstd"))), + objectMapper.reader().readValue(responseBodyLines[1], MetadataObject.class)); + } + @Test public void listAllTables() { given() @@ -212,8 +248,10 @@ public void listAllTables() { .get("delta-api/v1/shares/{share}/all-tables", "name") .then() .statusCode(200) - .body("items", hasSize(2)) - .body("items[0].name", either(is("table1")).or(is("table-with-history"))) + .body("items", hasSize(3)) + .body( + "items[0].name", + either(is("table1")).or(is("table-with-history")).or(is("icebergtable1"))) .body("items[0].schema", is("default")) .body("items[0].share", is("name")) .body("nextPageToken", is(nullValue())); diff --git a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java index 76c515b10..ac6225762 100644 --- a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java @@ -3,8 +3,15 @@ import io.whitefox.core.Metadata; import io.whitefox.core.ReadTableRequest; import io.whitefox.core.ReadTableResultToBeSigned; +import io.whitefox.core.TableSchema; +import java.sql.Timestamp; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.commons.lang3.NotImplementedException; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; public class IcebergSharedTable implements InternalSharedTable { @@ -27,12 +34,44 @@ public static IcebergSharedTable of(Table icebergTable) { } public Optional getMetadata(Optional startingTimestamp) { - throw new NotImplementedException(); + return getSnapshot(startingTimestamp).map(this::getMetadataFromSnapshot); + } + + private Metadata getMetadataFromSnapshot(Snapshot snapshot) { + return new Metadata( + String.valueOf(snapshot.snapshotId()), + Optional.of(icebergTable.name()), + Optional.empty(), + Metadata.Format.PARQUET, + new TableSchema(tableSchemaConverter.convertIcebergSchemaToWhitefox( + icebergTable.schema().asStruct())), + icebergTable.spec().fields().stream() + .map(PartitionField::name) + .collect(Collectors.toList()), + icebergTable.properties(), + Optional.of(snapshot.sequenceNumber()), + Optional.empty(), // size is fine to be empty + Optional.empty() // numFiles is ok to be empty here too + ); + } + + private Optional getSnapshot(Optional startingTimestamp) { + return startingTimestamp + .map(this::getTimestamp) + .map(Timestamp::getTime) + .map(icebergTable::snapshot) + .or(() -> Optional.of(icebergTable.currentSnapshot())); + } + + private Timestamp getTimestamp(String timestamp) { + return new Timestamp(OffsetDateTime.parse(timestamp, DateTimeFormatter.ISO_OFFSET_DATE_TIME) + .toInstant() + .toEpochMilli()); } @Override public Optional getTableVersion(Optional startingTimestamp) { - throw new NotImplementedException(); + return Optional.of(0L); } @Override diff --git a/server/core/src/test/java/io/whitefox/core/services/IcebergAwsSharedTableTest.java b/server/core/src/test/java/io/whitefox/core/services/IcebergAwsSharedTableTest.java new file mode 100644 index 000000000..d78efb1d3 --- /dev/null +++ b/server/core/src/test/java/io/whitefox/core/services/IcebergAwsSharedTableTest.java @@ -0,0 +1,48 @@ +package io.whitefox.core.services; + +import static io.whitefox.IcebergTestUtils.s3IcebergTableWithAwsGlueCatalog; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.wildfly.common.Assert.assertTrue; + +import io.whitefox.AwsGlueTestConfig; +import io.whitefox.S3TestConfig; +import io.whitefox.core.SharedTable; +import java.util.Optional; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +@DisabledOnOs(OS.WINDOWS) +public class IcebergAwsSharedTableTest { + + private final IcebergTableLoader icebergTableLoader = new IcebergTableLoader( + new IcebergCatalogHandler(new AwsGlueConfigBuilder(), new HadoopConfigBuilder())); + private final S3TestConfig s3TestConfig = S3TestConfig.loadFromEnv(); + private final AwsGlueTestConfig awsGlueTestConfig = AwsGlueTestConfig.loadFromEnv(); + + @Test + void getTableMetadata() { + var PTable = new SharedTable( + "icebergtable1", + "default", + "share1", + s3IcebergTableWithAwsGlueCatalog( + s3TestConfig, awsGlueTestConfig, "test_glue_db", "icebergtable1")); + var DTable = icebergTableLoader.loadTable(PTable); + var metadata = DTable.getMetadata(Optional.empty()); + assertTrue(metadata.isPresent()); + assertEquals("7819530050735196523", metadata.get().id()); + } + + @Test + void getUnknownTableMetadata() { + var unknownPTable = new SharedTable( + "notFound", + "default", + "share1", + s3IcebergTableWithAwsGlueCatalog( + s3TestConfig, awsGlueTestConfig, "test_glue_db", "not-found")); + assertThrows(IllegalArgumentException.class, () -> DeltaSharedTable.of(unknownPTable)); + } +} diff --git a/server/core/src/test/java/io/whitefox/core/services/IcebergSharedTableTest.java b/server/core/src/test/java/io/whitefox/core/services/IcebergSharedTableTest.java new file mode 100644 index 000000000..2fdd76a29 --- /dev/null +++ b/server/core/src/test/java/io/whitefox/core/services/IcebergSharedTableTest.java @@ -0,0 +1,39 @@ +package io.whitefox.core.services; + +import static io.whitefox.IcebergTestUtils.icebergTableWithHadoopCatalog; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.wildfly.common.Assert.assertTrue; + +import io.whitefox.core.SharedTable; +import java.util.Optional; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +@DisabledOnOs(OS.WINDOWS) +public class IcebergSharedTableTest { + + private final IcebergTableLoader icebergTableLoader = new IcebergTableLoader( + new IcebergCatalogHandler(new AwsGlueConfigBuilder(), new HadoopConfigBuilder())); + + @Test + void getTableMetadata() { + var PTable = new SharedTable( + "icebergtable1", + "default", + "share1", + icebergTableWithHadoopCatalog("test_db", "icebergtable1")); + var DTable = icebergTableLoader.loadTable(PTable); + var metadata = DTable.getMetadata(Optional.empty()); + assertTrue(metadata.isPresent()); + assertEquals("3369848726892806393", metadata.get().id()); + } + + @Test + void getUnknownTableMetadata() { + var unknownPTable = new SharedTable( + "notFound", "default", "share1", icebergTableWithHadoopCatalog("test_db", "not-found")); + assertThrows(IllegalArgumentException.class, () -> DeltaSharedTable.of(unknownPTable)); + } +}