diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java b/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java index 2312aeb41..133d52c83 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java @@ -1,8 +1,7 @@ package io.whitefox.api.deltasharing; import static io.whitefox.DeltaTestUtils.*; -import static io.whitefox.IcebergTestUtils.icebergTableWithHadoopCatalog; -import static io.whitefox.IcebergTestUtils.s3IcebergTableWithAwsGlueCatalog; +import static io.whitefox.IcebergTestUtils.*; import io.whitefox.AwsGlueTestConfig; import io.whitefox.S3TestConfig; @@ -73,6 +72,78 @@ public static StorageManager createStorageManager() { 0L))); } + public static final ParquetProtocol localIcebergTable1Protocol = + ParquetProtocol.ofMinReaderVersion(1); + public static final ParquetProtocol s3IcebergTable1Protocol = + ParquetProtocol.ofMinReaderVersion(1); + public static final ParquetMetadata localIcebergTable1Metadata = ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() + .id("3369848726892806393") + .name(Optional.of("metastore.test_db.icebergtable1")) + .format(new Format()) + .schemaString( + "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}}]}") + .partitionColumns(List.of()) + .version(Optional.of(1L)) + .configuration(Optional.of(Map.of("write.parquet.compression-codec", "zstd"))) + .build()) + .build(); + + public static final Set localIcebergTableFilesToBeSigned = Set.of( + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"id\":3},\"maxValues\":{\"id\":3},\"nullCount\":{\"id\":0}}") + .version(1L) + .timestamp(1705667209813L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}") + .version(1L) + .timestamp(1705667209813L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}") + .version(1L) + .timestamp(1705667209813L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(418L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0}}") + .version(1L) + .timestamp(1705667209813L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":0}}") + .version(1L) + .timestamp(1705667209813L))); + + public static final ParquetMetadata s3IcebergTable1Metadata = ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() + .id("7819530050735196523") + .name(Optional.of("metastore.test_glue_db.icebergtable1")) + .format(new Format()) + .schemaString( + "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}}]}") + .partitionColumns(List.of()) + .version(Optional.of(1L)) + .configuration(Optional.of(Map.of("write.parquet.compression-codec", "zstd"))) + .build()) + .build(); + public static final ParquetMetadata deltaTable1Metadata = ParquetMetadata.builder() .metadata(ParquetMetadata.Metadata.builder() .id("56d48189-cdbc-44f2-9b0e-2bded4c79ed7") @@ -98,6 +169,7 @@ public static StorageManager createStorageManager() { .configuration(Optional.of(Map.of())) .build()) .build(); + public static final ParquetMetadata deltaTableWithHistory1Metadata = ParquetMetadata.builder() .metadata(ParquetMetadata.Metadata.builder() .id("56d48189-cdbc-44f2-9b0e-2bded4c79ed7") @@ -110,6 +182,7 @@ public static StorageManager createStorageManager() { .configuration(Optional.of(Map.of())) .build()) .build(); + public static final ParquetProtocol deltaTable1Protocol = ParquetProtocol.ofMinReaderVersion(1); public static final ParquetProtocol s3DeltaTable1Protocol = ParquetProtocol.ofMinReaderVersion(1); @@ -196,6 +269,48 @@ public static StorageManager createStorageManager() { .build()) .build()); + public static final Set s3IcebergTable1FilesWithoutPresignedUrl = + Set.of( + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":0}}") + .version(1L) + .timestamp(1705948389052L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"id\":3},\"maxValues\":{\"id\":3},\"nullCount\":{\"id\":0}}") + .version(1L) + .timestamp(1705948389052L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}") + .version(1L) + .timestamp(1705948389052L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(418L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0}}") + .version(1L) + .timestamp(1705948389052L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}") + .version(1L) + .timestamp(1705948389052L))); public static final Set s3DeltaTable1FilesWithoutPresignedUrl = Set.of( new FileObjectWithoutPresignedUrl() diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectFileWithoutPresignedUrl.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectFileWithoutPresignedUrl.java index 7cd55a5d6..7aa666a91 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectFileWithoutPresignedUrl.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectFileWithoutPresignedUrl.java @@ -6,6 +6,10 @@ import java.util.Map; import java.util.Objects; +/** + * This class is test-only and it's needed because we can't know in advance pre-signed urls, in this way we can + * easily run assertions on FileObjects that contain pre-signed urls ignoring the url. + */ public class FileObjectFileWithoutPresignedUrl { private Map partitionValues = new HashMap<>(); diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectWithoutPresignedUrl.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectWithoutPresignedUrl.java index 560f0a558..7706a0f1f 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectWithoutPresignedUrl.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectWithoutPresignedUrl.java @@ -3,6 +3,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Objects; +/** + * This class is test-only and it's needed because we can't know in advance pre-signed urls, in this way we can + * easily run assertions on FileObjects that contain pre-signed urls ignoring the url. + */ public class FileObjectWithoutPresignedUrl { private FileObjectFileWithoutPresignedUrl _file; diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java index 1c0f3341d..78320ab07 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java @@ -149,7 +149,7 @@ public void icebergTableMetadata() throws IOException { @DisabledOnOs(OS.WINDOWS) @Test - public void queryTableCurrentVersion() throws IOException { + public void queryDeltaTableCurrentVersion() throws IOException { var responseBodyLines = given() .when() .filter(deltaFilter) @@ -192,7 +192,7 @@ public void queryTableCurrentVersion() throws IOException { @DisabledOnOs(OS.WINDOWS) @Test - public void queryTableByVersion() throws IOException { + public void queryDeltaTableByVersion() throws IOException { var responseBodyLines = given() .when() .filter(deltaFilter) @@ -232,4 +232,133 @@ public void queryTableByVersion() throws IOException { assertEquals(7, responseBodyLines.length); assertEquals(s3DeltaTable1FilesWithoutPresignedUrl, files); } + + @DisabledOnOs(OS.WINDOWS) + @Test + public void queryIcebergTableCurrentVersion() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .body("{}") + .header(new Header("Content-Type", "application/json")) + .post( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query", + "s3share", + "s3schema", + "s3IcebergTable1") + .then() + .statusCode(200) + .extract() + .body() + .asString() + .split("\n"); + + assertEquals( + s3IcebergTable1Protocol, + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); + assertEquals( + s3IcebergTable1Metadata, + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); + var files = Arrays.stream(responseBodyLines) + .skip(2) + .map(line -> { + try { + return objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .reader() + .readValue(line, FileObjectWithoutPresignedUrl.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + assertEquals(7, responseBodyLines.length); + assertEquals(s3IcebergTable1FilesWithoutPresignedUrl, files); + } + + @DisabledOnOs(OS.WINDOWS) + @Test + public void queryIcebergTableByVersion() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .body("{\"version\": 7819530050735196523}") + .header(new Header("Content-Type", "application/json")) + .post( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query", + "s3share", + "s3schema", + "s3IcebergTable1") + .then() + .statusCode(200) + .extract() + .body() + .asString() + .split("\n"); + + assertEquals( + s3IcebergTable1Protocol, + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); + assertEquals( + s3IcebergTable1Metadata, + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); + var files = Arrays.stream(responseBodyLines) + .skip(2) + .map(line -> { + try { + return objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .reader() + .readValue(line, FileObjectWithoutPresignedUrl.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + assertEquals(7, responseBodyLines.length); + assertEquals(s3IcebergTable1FilesWithoutPresignedUrl, files); + } + + @DisabledOnOs(OS.WINDOWS) + @Test + public void queryIcebergTableByTs() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .body("{\"timestamp\": \"2024-02-02T12:00:00Z\"}") + .header(new Header("Content-Type", "application/json")) + .post( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query", + "s3share", + "s3schema", + "s3IcebergTable1") + .then() + .statusCode(200) + .extract() + .body() + .asString() + .split("\n"); + + assertEquals( + s3IcebergTable1Protocol, + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); + assertEquals( + s3IcebergTable1Metadata, + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); + var files = Arrays.stream(responseBodyLines) + .skip(2) + .map(line -> { + try { + return objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .reader() + .readValue(line, FileObjectWithoutPresignedUrl.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + assertEquals(7, responseBodyLines.length); + assertEquals(s3IcebergTable1FilesWithoutPresignedUrl, files); + } } diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java index b0c9ea4e1..6cb531d92 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java @@ -215,8 +215,14 @@ public void deltaTableMetadata() throws IOException { objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); } + /** + * This test is disabled because we statically generate iceberg tables and then test against them. + * Unfortunately iceberg table metadata contains the files full path, not a relative one, therefore if we generate it + * on someone laptop it will not be able to find it during CI or on another person laptop. + * This code is still tested against s3 so we're still covered. + */ @Test - @DisabledOnOs(OS.WINDOWS) + @Disabled public void icebergTableVersion() { given() .when() @@ -231,8 +237,14 @@ public void icebergTableVersion() { .header("Delta-Table-Version", "1"); } + /** + * This test is disabled because we statically generate iceberg tables and then test against them. + * Unfortunately iceberg table metadata contains the files full path, not a relative one, therefore if we generate it + * on someone laptop it will not be able to find it during CI or on another person laptop. + * This code is still tested against s3 so we're still covered. + */ @Test - @DisabledOnOs(OS.WINDOWS) + @Disabled public void icebergTableMetadata() throws IOException { var responseBodyLines = given() .when() @@ -552,4 +564,148 @@ public void queryTableByTs() throws IOException { .collect(Collectors.toSet())); assertEquals(7, responseBodyLines.length); } + + /** + * This test is disabled because we statically generate iceberg tables and then test against them. + * Unfortunately iceberg table metadata contains the files full path, not a relative one, therefore if we generate it + * on someone laptop it will not be able to find it during CI or on another person laptop. + * This code is still tested against s3 so we're still covered. + */ + @Disabled + @Test + public void queryIcebergTableCurrentVersion() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .body("{}") + .header(new Header("Content-Type", "application/json")) + .post( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query", + "name", + "default", + "icebergtable1") + .then() + .statusCode(200) + .header("Delta-Table-Version", "1") + .extract() + .body() + .asString() + .split("\n"); + + assertEquals( + localIcebergTable1Protocol, + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); + assertEquals( + localIcebergTable1Metadata, + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); + var files = Arrays.stream(responseBodyLines) + .skip(2) + .map(line -> { + try { + return objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .reader() + .readValue(line, FileObjectWithoutPresignedUrl.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + assertEquals(7, responseBodyLines.length); + assertEquals(localIcebergTableFilesToBeSigned, files); + } + + /** + * This test is disabled because we statically generate iceberg tables and then test against them. + * Unfortunately iceberg table metadata contains the files full path, not a relative one, therefore if we generate it + * on someone laptop it will not be able to find it during CI or on another person laptop. + * This code is still tested against s3 so we're still covered. + */ + @Disabled + @Test + public void queryIcebergTableByVersion() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .body("{\"version\": 3369848726892806393}") + .header(new Header("Content-Type", "application/json")) + .post( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query", + "name", + "default", + "icebergtable1") + .then() + .statusCode(200) + .header("Delta-Table-Version", "1") + .extract() + .body() + .asString() + .split("\n"); + + assertEquals( + localIcebergTable1Protocol, + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); + assertEquals( + localIcebergTable1Metadata, + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); + var files = Arrays.stream(responseBodyLines) + .skip(2) + .map(line -> { + try { + return objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .reader() + .readValue(line, FileObjectWithoutPresignedUrl.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + assertEquals(7, responseBodyLines.length); + assertEquals(localIcebergTableFilesToBeSigned, files); + } + + @Disabled + @Test + public void queryIcebergTableByTs() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .body("{\"timestamp\": \"2024-02-02T12:00:00Z\"}") + .header(new Header("Content-Type", "application/json")) + .post( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query", + "name", + "default", + "icebergtable1") + .then() + .statusCode(200) + .header("Delta-Table-Version", "1") + .extract() + .body() + .asString() + .split("\n"); + + assertEquals( + localIcebergTable1Protocol, + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); + assertEquals( + localIcebergTable1Metadata, + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); + var files = Arrays.stream(responseBodyLines) + .skip(2) + .map(line -> { + try { + return objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .reader() + .readValue(line, FileObjectWithoutPresignedUrl.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + assertEquals(7, responseBodyLines.length); + assertEquals(localIcebergTableFilesToBeSigned, files); + } } diff --git a/server/core/src/main/java/io/whitefox/core/FileStats.java b/server/core/src/main/java/io/whitefox/core/FileStats.java index 2c5b648ae..5be1b1200 100644 --- a/server/core/src/main/java/io/whitefox/core/FileStats.java +++ b/server/core/src/main/java/io/whitefox/core/FileStats.java @@ -6,42 +6,42 @@ public class FileStats { // {"numRecords":1,"minValues":{"id":0},"maxValues":{"id":0},"nullCount":{"id":0}} @JsonProperty("numRecords") - String numRecords; + Long numRecords; @JsonProperty("minValues") - Map minValues; + Map minValues; @JsonProperty("maxValues") - Map maxValues; + Map maxValues; @JsonProperty("nullCount") - Map nullCount; + Map nullCount; public FileStats() { super(); } - public String getNumRecords() { + public Long getNumRecords() { return numRecords; } - public Map getMinValues() { + public Map getMinValues() { return minValues; } - public Map getMaxValues() { + public Map getMaxValues() { return maxValues; } - public Map getNullCount() { + public Map getNullCount() { return nullCount; } public FileStats( - String numRecords, - Map minValues, - Map maxValues, - Map nullCount) { + Long numRecords, + Map minValues, + Map maxValues, + Map nullCount) { this.numRecords = numRecords; this.minValues = minValues; this.maxValues = maxValues; diff --git a/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilder.java b/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilder.java new file mode 100644 index 000000000..95bf204f4 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilder.java @@ -0,0 +1,45 @@ +package io.whitefox.core; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectWriter; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Conversions; + +public class IcebergFileStatsBuilder { + + private final ObjectWriter objectWriter; + + public IcebergFileStatsBuilder(ObjectWriter objectWriter) { + this.objectWriter = objectWriter; + } + + public String buildStats( + Schema schema, + Long numRecords, + Map minValues, + Map maxValues, + Map nullCount) + throws IcebergFileStatsBuilderException { + try { + return objectWriter.writeValueAsString(new FileStats( + numRecords, + buildValuesMap(minValues, schema), + buildValuesMap(maxValues, schema), + nullCount.entrySet().stream() + .collect( + Collectors.toMap(e -> schema.findColumnName(e.getKey()), Map.Entry::getValue)))); + } catch (JsonProcessingException e) { + throw new IcebergFileStatsBuilderException(e); + } + } + + private Map buildValuesMap(Map map, Schema schema) { + return map.entrySet().stream() + .collect(Collectors.toMap( + e -> schema.findColumnName(e.getKey()), + e -> Conversions.fromByteBuffer(schema.findType(e.getKey()), e.getValue()))); + } +} diff --git a/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilderException.java b/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilderException.java new file mode 100644 index 000000000..997d50233 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilderException.java @@ -0,0 +1,17 @@ +package io.whitefox.core; + +import java.util.Arrays; + +public class IcebergFileStatsBuilderException extends RuntimeException { + private final Exception cause; + + public IcebergFileStatsBuilderException(Exception cause) { + this.cause = cause; + } + + @Override + public String getMessage() { + return "Building of Iceberg file statistics failed due to: " + cause.getMessage() + + "\n Stack trace: " + Arrays.toString(cause.getStackTrace()); + } +} diff --git a/server/core/src/main/java/io/whitefox/core/IcebergPartitionValuesBuilder.java b/server/core/src/main/java/io/whitefox/core/IcebergPartitionValuesBuilder.java new file mode 100644 index 000000000..e1032b1c7 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/IcebergPartitionValuesBuilder.java @@ -0,0 +1,22 @@ +package io.whitefox.core; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Types; + +public class IcebergPartitionValuesBuilder { + + public Map buildPartitionValues( + List partitionFields, StructLike partitionValues) { + var map = new HashMap(); + for (int i = 0; i < partitionFields.size(); i++) { + Types.NestedField field = partitionFields.get(i); + map.put( + field.name(), + partitionValues.get(i, field.type().typeId().javaClass()).toString()); + } + return map; + } +} diff --git a/server/core/src/main/java/io/whitefox/core/PredicateUtils.java b/server/core/src/main/java/io/whitefox/core/PredicateUtils.java index c1945b971..cd59f26c0 100644 --- a/server/core/src/main/java/io/whitefox/core/PredicateUtils.java +++ b/server/core/src/main/java/io/whitefox/core/PredicateUtils.java @@ -113,11 +113,11 @@ public static EvalContext createEvalContext(AddFile file) throws PredicateParsin try { var fileStats = objectMapper.readValue(statsString, FileStats.class); - var maxValues = fileStats.maxValues; + var maxValues = fileStats.getMaxValues(); var mappedMinMaxPairs = new java.util.HashMap>(); fileStats.getMinValues().forEach((minK, minV) -> { - String maxV = maxValues.get(minK); - Pair minMaxPair = Pair.of(minV, maxV); + String maxV = String.valueOf(maxValues.get(minK)); + Pair minMaxPair = Pair.of(String.valueOf(minV), maxV); mappedMinMaxPairs.put(minK, minMaxPair); }); return new EvalContext(partitionValues, mappedMinMaxPairs); diff --git a/server/core/src/main/java/io/whitefox/core/StorageProperties.java b/server/core/src/main/java/io/whitefox/core/StorageProperties.java index 8628753b7..7891af321 100644 --- a/server/core/src/main/java/io/whitefox/core/StorageProperties.java +++ b/server/core/src/main/java/io/whitefox/core/StorageProperties.java @@ -1,9 +1,14 @@ package io.whitefox.core; import io.whitefox.annotations.SkipCoverageGenerated; +import java.util.List; import java.util.Objects; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; public interface StorageProperties { + void validateTypeAndUri(StorageType storageType, String uri); + public static class S3Properties implements StorageProperties { private final AwsCredentials credentials; @@ -35,5 +40,39 @@ public String toString() { public AwsCredentials credentials() { return credentials; } + + private static final List s3Prefixes = List.of("s3://", "s3a://", "s3n://"); + + @Override + public void validateTypeAndUri(StorageType storageType, String uri) { + + if (storageType != StorageType.S3) { + throw new IllegalArgumentException( + String.format("Invalid storage type %s for S3Properties", storageType)); + } + if (s3Prefixes.stream().noneMatch(pre -> StringUtils.startsWith(uri, pre))) { + throw new IllegalArgumentException(String.format( + "s3 uri must start with any of %s but %s was provided", + String.join(", ", s3Prefixes), uri)); + } + } + } + + public static class LocalProperties implements StorageProperties { + @Override + public void validateTypeAndUri(StorageType storageType, String uri) { + if (storageType != StorageType.LOCAL) { + throw new IllegalArgumentException( + String.format("Invalid storage type %s for LocalProprties", storageType)); + } + if (!StringUtils.startsWith(uri, "file://")) { + throw new IllegalArgumentException( + String.format("local uri must start with file:// but %s was provided", uri)); + } + } + + public Configuration hadoopConf() { + return new Configuration(); + } } } diff --git a/server/core/src/main/java/io/whitefox/core/TableFileToBeSigned.java b/server/core/src/main/java/io/whitefox/core/TableFileToBeSigned.java index b7cda3d8f..b4fa50688 100644 --- a/server/core/src/main/java/io/whitefox/core/TableFileToBeSigned.java +++ b/server/core/src/main/java/io/whitefox/core/TableFileToBeSigned.java @@ -40,7 +40,7 @@ public boolean equals(Object o) { TableFileToBeSigned that = (TableFileToBeSigned) o; return size == that.size && version == that.version - && timestamp == that.timestamp + && Objects.equals(timestamp, that.timestamp) && Objects.equals(url, that.url) && Objects.equals(stats, that.stats) && Objects.equals(partitionValues, that.partitionValues); diff --git a/server/core/src/main/java/io/whitefox/core/services/FileIOFactory.java b/server/core/src/main/java/io/whitefox/core/services/FileIOFactory.java new file mode 100644 index 000000000..a7760097e --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/services/FileIOFactory.java @@ -0,0 +1,8 @@ +package io.whitefox.core.services; + +import io.whitefox.core.Storage; +import org.apache.iceberg.io.FileIO; + +public interface FileIOFactory { + FileIO newFileIO(Storage storage); +} diff --git a/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java b/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java new file mode 100644 index 000000000..961779bf6 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java @@ -0,0 +1,66 @@ +package io.whitefox.core.services; + +import io.whitefox.core.AwsCredentials; +import io.whitefox.core.Storage; +import io.whitefox.core.StorageProperties; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.aws.AwsClientProperties; +import org.apache.iceberg.aws.s3.S3FileIOProperties; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.ResolvingFileIO; + +public class FileIOFactoryImpl implements FileIOFactory { + + @Override + public FileIO newFileIO(Storage storage) { + storage.properties().validateTypeAndUri(storage.type(), storage.uri()); + var propsAndConfs = buildPropsFromStorage(storage); + String className = resolveClassNameFromStorage(storage); + return CatalogUtil.loadFileIO(className, propsAndConfs.props(), propsAndConfs.conf()); + } + + private static String resolveClassNameFromStorage(Storage storage) { + String className = null; + try (ResolvingFileIO rfi = new ResolvingFileIO()) { + className = rfi.ioClass(storage.uri()).getCanonicalName(); + } + return className; + } + + private PropsAndConf buildPropsFromStorage(Storage storage) { + switch (storage.type()) { + case S3: + if (storage.properties() instanceof StorageProperties.S3Properties) { + AwsCredentials credentials = + ((StorageProperties.S3Properties) storage.properties()).credentials(); + if (credentials instanceof AwsCredentials.SimpleAwsCredentials simpleAwsCredentials) { + return new PropsAndConf( + Map.of( + S3FileIOProperties.ACCESS_KEY_ID, simpleAwsCredentials.awsAccessKeyId(), + S3FileIOProperties.SECRET_ACCESS_KEY, simpleAwsCredentials.awsSecretAccessKey(), + AwsClientProperties.CLIENT_REGION, simpleAwsCredentials.region()), + null); + } else { + throw new RuntimeException("Missing aws credentials"); // TODO better message + } + } else { + throw new RuntimeException("Missing s3 properties"); // TODO better message + } + case LOCAL: + if (storage.properties() instanceof StorageProperties.LocalProperties) { + return new PropsAndConf( + Map.of(), ((StorageProperties.LocalProperties) storage.properties()).hadoopConf()); + } else { + throw new RuntimeException("Missing local properties"); // TODO better message + } + + default: + throw new IllegalArgumentException( + String.format("Unsupported storage type: [%s]", storage.type())); + } + } + + private record PropsAndConf(Map props, Configuration conf) {} +} diff --git a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java index fb160b527..caa633c90 100644 --- a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java @@ -1,14 +1,11 @@ package io.whitefox.core.services; -import io.whitefox.core.Metadata; -import io.whitefox.core.ReadTableRequest; -import io.whitefox.core.ReadTableResultToBeSigned; -import io.whitefox.core.TableSchema; +import io.whitefox.core.*; import io.whitefox.core.services.capabilities.ResponseFormat; import java.sql.Timestamp; import java.util.Optional; import java.util.stream.Collectors; -import org.apache.commons.lang3.NotImplementedException; +import java.util.stream.StreamSupport; import org.apache.iceberg.PartitionField; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -18,19 +15,53 @@ public class IcebergSharedTable implements InternalSharedTable { private final Table icebergTable; private final TableSchemaConverter tableSchemaConverter; + private final SharedTable tableDetails; + private final FileIOFactory fileIOFactory; + private final IcebergFileStatsBuilder icebergFileStatsBuilder; + private final IcebergPartitionValuesBuilder icebergPartitionValuesBuilder; - private IcebergSharedTable(Table icebergTable, TableSchemaConverter tableSchemaConverter) { + private IcebergSharedTable( + Table icebergTable, + TableSchemaConverter tableSchemaConverter, + SharedTable tableDetails, + FileIOFactory fileIOFactory, + IcebergFileStatsBuilder icebergFileStatsBuilder, + IcebergPartitionValuesBuilder icebergPartitionValuesBuilder) { this.icebergTable = icebergTable; this.tableSchemaConverter = tableSchemaConverter; + this.tableDetails = tableDetails; + this.fileIOFactory = fileIOFactory; + this.icebergFileStatsBuilder = icebergFileStatsBuilder; + this.icebergPartitionValuesBuilder = icebergPartitionValuesBuilder; } public static IcebergSharedTable of( - Table icebergTable, TableSchemaConverter tableSchemaConverter) { - return new IcebergSharedTable(icebergTable, tableSchemaConverter); + Table icebergTable, + SharedTable tableDetails, + TableSchemaConverter tableSchemaConverter, + IcebergFileStatsBuilder icebergFileStatsBuilder, + IcebergPartitionValuesBuilder icebergPartitionValuesBuilder) { + return new IcebergSharedTable( + icebergTable, + tableSchemaConverter, + tableDetails, + new FileIOFactoryImpl(), + icebergFileStatsBuilder, + icebergPartitionValuesBuilder); } - public static IcebergSharedTable of(Table icebergTable) { - return new IcebergSharedTable(icebergTable, new TableSchemaConverter()); + public static IcebergSharedTable of( + Table icebergTable, + SharedTable tableDetails, + IcebergFileStatsBuilder icebergFileStatsBuilder, + IcebergPartitionValuesBuilder icebergPartitionValuesBuilder) { + return new IcebergSharedTable( + icebergTable, + new TableSchemaConverter(), + tableDetails, + new FileIOFactoryImpl(), + icebergFileStatsBuilder, + icebergPartitionValuesBuilder); } public Optional getMetadata(Optional startingTimestamp) { @@ -78,6 +109,39 @@ public Optional getTableVersion(Optional startingTimestamp) { @Override public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) { - throw new NotImplementedException(); + Snapshot snapshot; + if (readTableRequest instanceof ReadTableRequest.ReadTableCurrentVersion) { + snapshot = icebergTable.currentSnapshot(); + } else if (readTableRequest instanceof ReadTableRequest.ReadTableAsOfTimestamp) { + snapshot = icebergTable.snapshot(SnapshotUtil.snapshotIdAsOfTime( + icebergTable, ((ReadTableRequest.ReadTableAsOfTimestamp) readTableRequest).timestamp())); + } else if (readTableRequest instanceof ReadTableRequest.ReadTableVersion) { + snapshot = + icebergTable.snapshot(((ReadTableRequest.ReadTableVersion) readTableRequest).version()); + } else { + throw new IllegalArgumentException("Unknown ReadTableRequest type: " + readTableRequest); + } + try (var s3FileIO = + fileIOFactory.newFileIO(tableDetails.internalTable().provider().storage())) { + return new ReadTableResultToBeSigned( + new Protocol(Optional.of(1)), + getMetadataFromSnapshot(snapshot), + StreamSupport.stream(snapshot.addedDataFiles(s3FileIO).spliterator(), false) + .map(dataFile -> new TableFileToBeSigned( + dataFile.path().toString(), + dataFile.fileSizeInBytes(), + snapshot.sequenceNumber(), + Optional.of(snapshot.timestampMillis()), + icebergFileStatsBuilder.buildStats( + icebergTable.schema(), + dataFile.recordCount(), + dataFile.lowerBounds(), + dataFile.upperBounds(), + dataFile.nullValueCounts()), + icebergPartitionValuesBuilder.buildPartitionValues( + icebergTable.spec().partitionType().fields(), dataFile.partition()))) + .collect(Collectors.toList()), + snapshot.sequenceNumber()); + } } } diff --git a/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java b/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java index 640c13547..d84e870d1 100644 --- a/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java +++ b/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java @@ -1,9 +1,7 @@ package io.whitefox.core.services; -import io.whitefox.core.InternalTable; -import io.whitefox.core.Metastore; -import io.whitefox.core.MetastoreType; -import io.whitefox.core.SharedTable; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.whitefox.core.*; import org.apache.iceberg.catalog.TableIdentifier; public class IcebergTableLoader implements TableLoader { @@ -18,16 +16,25 @@ public IcebergTableLoader(IcebergCatalogHandler icebergCatalogHandler) { public IcebergSharedTable loadTable(SharedTable sharedTable) { if (sharedTable.internalTable().properties() instanceof InternalTable.IcebergTableProperties) { var metastore = getMetastore(sharedTable.internalTable()); + var storage = sharedTable.internalTable().provider().storage(); var tableId = getTableIdentifier(sharedTable.internalTable()); if (metastore.type() == MetastoreType.GLUE) { - return IcebergSharedTable.of(icebergCatalogHandler.loadTableWithGlueCatalog( - metastore, sharedTable.internalTable().provider().storage(), tableId)); + return IcebergSharedTable.of( + icebergCatalogHandler.loadTableWithGlueCatalog( + metastore, sharedTable.internalTable().provider().storage(), tableId), + sharedTable, + new IcebergFileStatsBuilder(new ObjectMapper().writer()), + new IcebergPartitionValuesBuilder()); } else if (metastore.type() == MetastoreType.HADOOP) { - return IcebergSharedTable.of(icebergCatalogHandler.loadTableWithHadoopCatalog( - metastore, sharedTable.internalTable().provider().storage(), tableId)); + return IcebergSharedTable.of( + icebergCatalogHandler.loadTableWithHadoopCatalog( + metastore, sharedTable.internalTable().provider().storage(), tableId), + sharedTable, + new IcebergFileStatsBuilder(new ObjectMapper().writer()), + new IcebergPartitionValuesBuilder()); } else { - throw new RuntimeException(String.format( - "Metastore type: [%s] not compatible with Iceberg table", metastore.type())); + throw new RuntimeException( + String.format("Unsupported metastore type: [%s]", metastore.type())); } } else { throw new IllegalArgumentException( diff --git a/server/core/src/test/java/io/whitefox/core/StoragePropertiesTest.java b/server/core/src/test/java/io/whitefox/core/StoragePropertiesTest.java new file mode 100644 index 000000000..82fe45287 --- /dev/null +++ b/server/core/src/test/java/io/whitefox/core/StoragePropertiesTest.java @@ -0,0 +1,33 @@ +package io.whitefox.core; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class StoragePropertiesTest { + private final StorageProperties.S3Properties s3props = + new StorageProperties.S3Properties(new AwsCredentials.SimpleAwsCredentials("a", "b", "c")); + + @Test + void testValidS3Properties() { + Assertions.assertDoesNotThrow( + () -> s3props.validateTypeAndUri(StorageType.S3, "s3://pippo-bucket")); + Assertions.assertDoesNotThrow( + () -> s3props.validateTypeAndUri(StorageType.S3, "s3n://pippo-bucket")); + Assertions.assertDoesNotThrow( + () -> s3props.validateTypeAndUri(StorageType.S3, "s3a://pippo-bucket")); + } + + @Test + void failIfWrongUri() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> s3props.validateTypeAndUri(StorageType.S3, "file:///pippo-bucket")); + } + + @Test + void failIfType() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> s3props.validateTypeAndUri(StorageType.LOCAL, "file:///pippo-bucket")); + } +} diff --git a/server/core/src/test/java/io/whitefox/core/services/FileIOFactoryImplTest.java b/server/core/src/test/java/io/whitefox/core/services/FileIOFactoryImplTest.java new file mode 100644 index 000000000..957f2a796 --- /dev/null +++ b/server/core/src/test/java/io/whitefox/core/services/FileIOFactoryImplTest.java @@ -0,0 +1,62 @@ +package io.whitefox.core.services; + +import io.whitefox.core.*; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Optional; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +public class FileIOFactoryImplTest { + private final FileIOFactoryImpl factory = new FileIOFactoryImpl(); + + @Test + void buildS3FileIO() { + var storage = new Storage( + "name", + Optional.empty(), + new Principal("Mr. Fox"), + StorageType.S3, + Optional.empty(), + "s3a://my-bucket", + 0L, + new Principal("Mr. Fox"), + 0L, + new Principal("Mr. Fox"), + new StorageProperties.S3Properties( + new AwsCredentials.SimpleAwsCredentials("a", "b", "us-east1"))); + try (var fileIO = factory.newFileIO(storage)) { + Assertions.assertInstanceOf(S3FileIO.class, fileIO); + } + } + + @Test + @DisabledOnOs(OS.WINDOWS) + void buildLocalFileIO() throws IOException { + var storage = new Storage( + "name", + Optional.empty(), + new Principal("Mr. Fox"), + StorageType.LOCAL, + Optional.empty(), + "file:///tmp", + 0L, + new Principal("Mr. Fox"), + 0L, + new Principal("Mr. Fox"), + new StorageProperties.LocalProperties()); + try (var fileIO = factory.newFileIO(storage)) { + Assertions.assertInstanceOf(HadoopFileIO.class, fileIO); + Assertions.assertDoesNotThrow(() -> { + var of = fileIO.newOutputFile(Files.createTempFile("", "").toString()); + try (var f = of.createOrOverwrite()) { + f.write(1); + } + }); + } + } +} diff --git a/server/core/src/testFixtures/java/io/whitefox/TestUtils.java b/server/core/src/testFixtures/java/io/whitefox/TestUtils.java index 1603f382e..fe458cba6 100644 --- a/server/core/src/testFixtures/java/io/whitefox/TestUtils.java +++ b/server/core/src/testFixtures/java/io/whitefox/TestUtils.java @@ -56,7 +56,7 @@ public static Storage getS3Storage(Principal principal, S3TestConfig s3TestConfi principal, StorageType.S3, Optional.empty(), - "uri", + "s3://uri", 0L, principal, 0L,