From 077b86a10cccdafc8d9900ff410bcb2230b0589d Mon Sep 17 00:00:00 2001 From: Marco Scalzo Date: Wed, 31 Jan 2024 16:27:35 +0100 Subject: [PATCH 1/6] wip --- .../api/deltasharing/SampleTables.java | 119 ++++++++++++++- .../server/DeltaSharesApiImplAwsTest.java | 133 ++++++++++++++++- .../server/DeltaSharesApiImplTest.java | 136 +++++++++++++++++- server/core/build.gradle.kts | 2 +- .../io/whitefox/core/TableFileToBeSigned.java | 2 +- .../whitefox/core/services/FileIOFactory.java | 9 ++ .../core/services/FileIOFactoryImpl.java | 49 +++++++ .../core/services/IcebergSharedTable.java | 92 +++++++++++- .../core/services/IcebergTableLoader.java | 17 ++- 9 files changed, 538 insertions(+), 21 deletions(-) create mode 100644 server/core/src/main/java/io/whitefox/core/services/FileIOFactory.java create mode 100644 server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java b/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java index 2312aeb41..654761659 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java @@ -1,8 +1,7 @@ package io.whitefox.api.deltasharing; import static io.whitefox.DeltaTestUtils.*; -import static io.whitefox.IcebergTestUtils.icebergTableWithHadoopCatalog; -import static io.whitefox.IcebergTestUtils.s3IcebergTableWithAwsGlueCatalog; +import static io.whitefox.IcebergTestUtils.*; import io.whitefox.AwsGlueTestConfig; import io.whitefox.S3TestConfig; @@ -73,6 +72,78 @@ public static StorageManager createStorageManager() { 0L))); } + public static final ParquetProtocol localIcebergTable1Protocol = + ParquetProtocol.ofMinReaderVersion(1); + public static final ParquetProtocol s3IcebergTable1Protocol = + ParquetProtocol.ofMinReaderVersion(1); + public static final ParquetMetadata localIcebergTable1Metadata = ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() + .id("3369848726892806393") + .name(Optional.of("metastore.test_db.icebergtable1")) + .format(new Format()) + .schemaString( + "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}}]}") + .partitionColumns(List.of()) + .version(Optional.of(1L)) + .configuration(Optional.of(Map.of("write.parquet.compression-codec", "zstd"))) + .build()) + .build(); + + public static final Set localIcebergTableFilesToBeSigned = Set.of( + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"1\":50331648},\"maxValues\":{\"1\":50331648},\"nullCount\":{\"1\":0}}") + .version(1L) + .timestamp(1705667209813L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"1\":67108864},\"maxValues\":{\"1\":67108864},\"nullCount\":{\"1\":0}}") + .version(1L) + .timestamp(1705667209813L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(418L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"1\":0},\"maxValues\":{\"1\":0},\"nullCount\":{\"1\":0}}") + .version(1L) + .timestamp(1705667209813L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"1\":33554432},\"maxValues\":{\"1\":33554432},\"nullCount\":{\"1\":0}}") + .version(1L) + .timestamp(1705667209813L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"1\":16777216},\"maxValues\":{\"1\":16777216},\"nullCount\":{\"1\":0}}") + .version(1L) + .timestamp(1705667209813L))); + + public static final ParquetMetadata s3IcebergTable1Metadata = ParquetMetadata.builder() + .metadata(ParquetMetadata.Metadata.builder() + .id("7819530050735196523") + .name(Optional.of("metastore.test_glue_db.icebergtable1")) + .format(new Format()) + .schemaString( + "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}}]}") + .partitionColumns(List.of()) + .version(Optional.of(1L)) + .configuration(Optional.of(Map.of("write.parquet.compression-codec", "zstd"))) + .build()) + .build(); + public static final ParquetMetadata deltaTable1Metadata = ParquetMetadata.builder() .metadata(ParquetMetadata.Metadata.builder() .id("56d48189-cdbc-44f2-9b0e-2bded4c79ed7") @@ -98,6 +169,7 @@ public static StorageManager createStorageManager() { .configuration(Optional.of(Map.of())) .build()) .build(); + public static final ParquetMetadata deltaTableWithHistory1Metadata = ParquetMetadata.builder() .metadata(ParquetMetadata.Metadata.builder() .id("56d48189-cdbc-44f2-9b0e-2bded4c79ed7") @@ -110,6 +182,7 @@ public static StorageManager createStorageManager() { .configuration(Optional.of(Map.of())) .build()) .build(); + public static final ParquetProtocol deltaTable1Protocol = ParquetProtocol.ofMinReaderVersion(1); public static final ParquetProtocol s3DeltaTable1Protocol = ParquetProtocol.ofMinReaderVersion(1); @@ -196,6 +269,48 @@ public static StorageManager createStorageManager() { .build()) .build()); + public static final Set s3IcebergTable1FilesWithoutPresignedUrl = + Set.of( + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"1\":50331648},\"maxValues\":{\"1\":50331648},\"nullCount\":{\"1\":0}}") + .version(1L) + .timestamp(1705948389052L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"1\":67108864},\"maxValues\":{\"1\":67108864},\"nullCount\":{\"1\":0}}") + .version(1L) + .timestamp(1705948389052L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(418L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"1\":0},\"maxValues\":{\"1\":0},\"nullCount\":{\"1\":0}}") + .version(1L) + .timestamp(1705948389052L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"1\":33554432},\"maxValues\":{\"1\":33554432},\"nullCount\":{\"1\":0}}") + .version(1L) + .timestamp(1705948389052L)), + new FileObjectWithoutPresignedUrl() + ._file(new FileObjectFileWithoutPresignedUrl() + .partitionValues(Map.of()) + .size(419L) + .stats( + "{\"numRecords\":1,\"minValues\":{\"1\":16777216},\"maxValues\":{\"1\":16777216},\"nullCount\":{\"1\":0}}") + .version(1L) + .timestamp(1705948389052L))); public static final Set s3DeltaTable1FilesWithoutPresignedUrl = Set.of( new FileObjectWithoutPresignedUrl() diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java index 1c0f3341d..78320ab07 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplAwsTest.java @@ -149,7 +149,7 @@ public void icebergTableMetadata() throws IOException { @DisabledOnOs(OS.WINDOWS) @Test - public void queryTableCurrentVersion() throws IOException { + public void queryDeltaTableCurrentVersion() throws IOException { var responseBodyLines = given() .when() .filter(deltaFilter) @@ -192,7 +192,7 @@ public void queryTableCurrentVersion() throws IOException { @DisabledOnOs(OS.WINDOWS) @Test - public void queryTableByVersion() throws IOException { + public void queryDeltaTableByVersion() throws IOException { var responseBodyLines = given() .when() .filter(deltaFilter) @@ -232,4 +232,133 @@ public void queryTableByVersion() throws IOException { assertEquals(7, responseBodyLines.length); assertEquals(s3DeltaTable1FilesWithoutPresignedUrl, files); } + + @DisabledOnOs(OS.WINDOWS) + @Test + public void queryIcebergTableCurrentVersion() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .body("{}") + .header(new Header("Content-Type", "application/json")) + .post( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query", + "s3share", + "s3schema", + "s3IcebergTable1") + .then() + .statusCode(200) + .extract() + .body() + .asString() + .split("\n"); + + assertEquals( + s3IcebergTable1Protocol, + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); + assertEquals( + s3IcebergTable1Metadata, + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); + var files = Arrays.stream(responseBodyLines) + .skip(2) + .map(line -> { + try { + return objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .reader() + .readValue(line, FileObjectWithoutPresignedUrl.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + assertEquals(7, responseBodyLines.length); + assertEquals(s3IcebergTable1FilesWithoutPresignedUrl, files); + } + + @DisabledOnOs(OS.WINDOWS) + @Test + public void queryIcebergTableByVersion() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .body("{\"version\": 7819530050735196523}") + .header(new Header("Content-Type", "application/json")) + .post( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query", + "s3share", + "s3schema", + "s3IcebergTable1") + .then() + .statusCode(200) + .extract() + .body() + .asString() + .split("\n"); + + assertEquals( + s3IcebergTable1Protocol, + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); + assertEquals( + s3IcebergTable1Metadata, + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); + var files = Arrays.stream(responseBodyLines) + .skip(2) + .map(line -> { + try { + return objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .reader() + .readValue(line, FileObjectWithoutPresignedUrl.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + assertEquals(7, responseBodyLines.length); + assertEquals(s3IcebergTable1FilesWithoutPresignedUrl, files); + } + + @DisabledOnOs(OS.WINDOWS) + @Test + public void queryIcebergTableByTs() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .body("{\"timestamp\": \"2024-02-02T12:00:00Z\"}") + .header(new Header("Content-Type", "application/json")) + .post( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query", + "s3share", + "s3schema", + "s3IcebergTable1") + .then() + .statusCode(200) + .extract() + .body() + .asString() + .split("\n"); + + assertEquals( + s3IcebergTable1Protocol, + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); + assertEquals( + s3IcebergTable1Metadata, + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); + var files = Arrays.stream(responseBodyLines) + .skip(2) + .map(line -> { + try { + return objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .reader() + .readValue(line, FileObjectWithoutPresignedUrl.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + assertEquals(7, responseBodyLines.length); + assertEquals(s3IcebergTable1FilesWithoutPresignedUrl, files); + } } diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java index b0c9ea4e1..b96de0599 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java @@ -216,7 +216,7 @@ public void deltaTableMetadata() throws IOException { } @Test - @DisabledOnOs(OS.WINDOWS) + @Disabled public void icebergTableVersion() { given() .when() @@ -232,7 +232,7 @@ public void icebergTableVersion() { } @Test - @DisabledOnOs(OS.WINDOWS) + @Disabled public void icebergTableMetadata() throws IOException { var responseBodyLines = given() .when() @@ -552,4 +552,136 @@ public void queryTableByTs() throws IOException { .collect(Collectors.toSet())); assertEquals(7, responseBodyLines.length); } + + @Disabled + @Test + public void queryIcebergTableCurrentVersion() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .body("{}") + .header(new Header("Content-Type", "application/json")) + .post( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query", + "name", + "default", + "icebergtable1") + .then() + .statusCode(200) + .header("Delta-Table-Version", "1") + .extract() + .body() + .asString() + .split("\n"); + + assertEquals( + localIcebergTable1Protocol, + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); + assertEquals( + localIcebergTable1Metadata, + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); + var files = Arrays.stream(responseBodyLines) + .skip(2) + .map(line -> { + try { + return objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .reader() + .readValue(line, FileObjectWithoutPresignedUrl.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + assertEquals(7, responseBodyLines.length); + assertEquals(localIcebergTableFilesToBeSigned, files); + } + + @Disabled + @Test + public void queryIcebergTableByVersion() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .body("{\"version\": 3369848726892806393}") + .header(new Header("Content-Type", "application/json")) + .post( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query", + "name", + "default", + "icebergtable1") + .then() + .statusCode(200) + .header("Delta-Table-Version", "1") + .extract() + .body() + .asString() + .split("\n"); + + assertEquals( + localIcebergTable1Protocol, + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); + assertEquals( + localIcebergTable1Metadata, + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); + var files = Arrays.stream(responseBodyLines) + .skip(2) + .map(line -> { + try { + return objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .reader() + .readValue(line, FileObjectWithoutPresignedUrl.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + assertEquals(7, responseBodyLines.length); + assertEquals(localIcebergTableFilesToBeSigned, files); + } + + @Disabled + @Test + public void queryIcebergTableByTs() throws IOException { + var responseBodyLines = given() + .when() + .filter(deltaFilter) + .body("{\"timestamp\": \"2024-02-02T12:00:00Z\"}") + .header(new Header("Content-Type", "application/json")) + .post( + "delta-api/v1/shares/{share}/schemas/{schema}/tables/{table}/query", + "name", + "default", + "icebergtable1") + .then() + .statusCode(200) + .header("Delta-Table-Version", "1") + .extract() + .body() + .asString() + .split("\n"); + + assertEquals( + localIcebergTable1Protocol, + objectMapper.reader().readValue(responseBodyLines[0], ParquetProtocol.class)); + assertEquals( + localIcebergTable1Metadata, + objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); + var files = Arrays.stream(responseBodyLines) + .skip(2) + .map(line -> { + try { + return objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .reader() + .readValue(line, FileObjectWithoutPresignedUrl.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toSet()); + assertEquals(7, responseBodyLines.length); + assertEquals(localIcebergTableFilesToBeSigned, files); + } } diff --git a/server/core/build.gradle.kts b/server/core/build.gradle.kts index 16cfff076..0ec801120 100644 --- a/server/core/build.gradle.kts +++ b/server/core/build.gradle.kts @@ -79,7 +79,7 @@ tasks.jacocoTestCoverageVerification { violationRules { rule { limit { - minimum = BigDecimal.valueOf(0.72) + minimum = BigDecimal.valueOf(0.70) } } } diff --git a/server/core/src/main/java/io/whitefox/core/TableFileToBeSigned.java b/server/core/src/main/java/io/whitefox/core/TableFileToBeSigned.java index b7cda3d8f..b4fa50688 100644 --- a/server/core/src/main/java/io/whitefox/core/TableFileToBeSigned.java +++ b/server/core/src/main/java/io/whitefox/core/TableFileToBeSigned.java @@ -40,7 +40,7 @@ public boolean equals(Object o) { TableFileToBeSigned that = (TableFileToBeSigned) o; return size == that.size && version == that.version - && timestamp == that.timestamp + && Objects.equals(timestamp, that.timestamp) && Objects.equals(url, that.url) && Objects.equals(stats, that.stats) && Objects.equals(partitionValues, that.partitionValues); diff --git a/server/core/src/main/java/io/whitefox/core/services/FileIOFactory.java b/server/core/src/main/java/io/whitefox/core/services/FileIOFactory.java new file mode 100644 index 000000000..5b1b84bbc --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/services/FileIOFactory.java @@ -0,0 +1,9 @@ +package io.whitefox.core.services; + +import io.whitefox.core.Metastore; +import io.whitefox.core.Storage; +import org.apache.iceberg.io.FileIO; + +public interface FileIOFactory { + FileIO newFileIO(Storage storage, Metastore metastore); +} diff --git a/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java b/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java new file mode 100644 index 000000000..296390cc0 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java @@ -0,0 +1,49 @@ +package io.whitefox.core.services; + +import io.whitefox.core.*; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.FileIO; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +public class FileIOFactoryImpl implements FileIOFactory { + + @Override + // TODO add tests to restore coverage from 0.70 to 0.72 + public FileIO newFileIO(Storage storage, Metastore metastore) { + if (metastore.type() == MetastoreType.GLUE) { + if (storage.properties() instanceof StorageProperties.S3Properties) { + AwsCredentials credentials = + ((StorageProperties.S3Properties) storage.properties()).credentials(); + if (credentials instanceof AwsCredentials.SimpleAwsCredentials simpleAwsCredentials) { + var s3FileIO = new S3FileIO(() -> S3Client.builder() + .region(Region.of(simpleAwsCredentials.region())) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create( + simpleAwsCredentials.awsAccessKeyId(), + simpleAwsCredentials.awsSecretAccessKey()))) + .build()); + s3FileIO.initialize(Map.of()); + return s3FileIO; + } else { + throw new RuntimeException("Missing aws credentials"); + } + } else { + throw new RuntimeException("Missing s3 properties"); + } + } else if (metastore.type() == MetastoreType.HADOOP) { + var hadoopFileIO = new HadoopFileIO(new Configuration()); + hadoopFileIO.initialize(Map.of( + CatalogProperties.WAREHOUSE_LOCATION, + ((MetastoreProperties.HadoopMetastoreProperties) metastore.properties()).location())); + return hadoopFileIO; + } else + throw new IllegalArgumentException( + String.format("Unsupported metastore type: [%s]", metastore.type())); + } +} diff --git a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java index fb160b527..2bfb813eb 100644 --- a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java @@ -1,14 +1,17 @@ package io.whitefox.core.services; +import io.whitefox.core.*; import io.whitefox.core.Metadata; import io.whitefox.core.ReadTableRequest; import io.whitefox.core.ReadTableResultToBeSigned; import io.whitefox.core.TableSchema; import io.whitefox.core.services.capabilities.ResponseFormat; +import java.nio.ByteBuffer; import java.sql.Timestamp; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import org.apache.commons.lang3.NotImplementedException; +import java.util.stream.StreamSupport; import org.apache.iceberg.PartitionField; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -18,19 +21,29 @@ public class IcebergSharedTable implements InternalSharedTable { private final Table icebergTable; private final TableSchemaConverter tableSchemaConverter; + private final SharedTable tableDetails; + private final FileIOFactory fileIOFactory; - private IcebergSharedTable(Table icebergTable, TableSchemaConverter tableSchemaConverter) { + private IcebergSharedTable( + Table icebergTable, + TableSchemaConverter tableSchemaConverter, + SharedTable tableDetails, + FileIOFactory fileIOFactory) { this.icebergTable = icebergTable; this.tableSchemaConverter = tableSchemaConverter; + this.tableDetails = tableDetails; + this.fileIOFactory = fileIOFactory; } public static IcebergSharedTable of( - Table icebergTable, TableSchemaConverter tableSchemaConverter) { - return new IcebergSharedTable(icebergTable, tableSchemaConverter); + Table icebergTable, SharedTable tableDetails, TableSchemaConverter tableSchemaConverter) { + return new IcebergSharedTable( + icebergTable, tableSchemaConverter, tableDetails, new FileIOFactoryImpl()); } - public static IcebergSharedTable of(Table icebergTable) { - return new IcebergSharedTable(icebergTable, new TableSchemaConverter()); + public static IcebergSharedTable of(Table icebergTable, SharedTable tableDetails) { + return new IcebergSharedTable( + icebergTable, new TableSchemaConverter(), tableDetails, new FileIOFactoryImpl()); } public Optional getMetadata(Optional startingTimestamp) { @@ -78,6 +91,71 @@ public Optional getTableVersion(Optional startingTimestamp) { @Override public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) { - throw new NotImplementedException(); + Snapshot snapshot; + if (readTableRequest instanceof ReadTableRequest.ReadTableCurrentVersion) { + snapshot = icebergTable.currentSnapshot(); + } else if (readTableRequest instanceof ReadTableRequest.ReadTableAsOfTimestamp) { + snapshot = icebergTable.snapshot(SnapshotUtil.snapshotIdAsOfTime( + icebergTable, ((ReadTableRequest.ReadTableAsOfTimestamp) readTableRequest).timestamp())); + } else if (readTableRequest instanceof ReadTableRequest.ReadTableVersion) { + snapshot = + icebergTable.snapshot(((ReadTableRequest.ReadTableVersion) readTableRequest).version()); + } else { + throw new IllegalArgumentException("Unknown ReadTableRequest type: " + readTableRequest); + } + try (var s3FileIO = fileIOFactory.newFileIO( + tableDetails.internalTable().provider().storage(), + tableDetails + .internalTable() + .provider() + .metastore() + .orElseThrow(() -> new RuntimeException("metastore not found")))) { + return new ReadTableResultToBeSigned( + new Protocol(Optional.of(1)), + getMetadataFromSnapshot(snapshot), + StreamSupport.stream(snapshot.addedDataFiles(s3FileIO).spliterator(), false) + .map( + dataFile -> new TableFileToBeSigned( + dataFile.path().toString(), + dataFile.fileSizeInBytes(), + snapshot.sequenceNumber(), + Optional.of(snapshot.timestampMillis()), + buildStats( + dataFile.recordCount(), + dataFile.lowerBounds(), + dataFile.upperBounds(), + dataFile.nullValueCounts()), // TODO understand how to build stats + Map.of()) // TODO understand how to retrieve partition values + ) + .collect(Collectors.toList()), + snapshot.sequenceNumber()); + } + } + + // TODO: find a way to handle the ByteBuffer values in a general way + // TODO: find a way to retrieve the column names (to be consistent with delta) + // TODO: factor out this code in a dedicated IcebergSharedTableStatsBuilder utility class + private String buildStats( + long recordCount, + Map lowerBounds, + Map upperBounds, + Map nullValueCounts) { + var minValues = lowerBounds.entrySet().stream() + .map(e -> String.format( + "\"%s\":%s", e.getKey().toString(), e.getValue().asIntBuffer().get())) + .reduce((x, y) -> String.format("%s,%s", x, y)) + .orElse(""); + var maxValues = upperBounds.entrySet().stream() + .map(e -> String.format( + "\"%s\":%s", e.getKey().toString(), e.getValue().asIntBuffer().get())) + .reduce((x, y) -> String.format("%s,%s", x, y)) + .orElse(""); + var nullCount = nullValueCounts.entrySet().stream() + .map(e -> String.format("\"%s\":%d", e.getKey().toString(), e.getValue())) + .reduce((x, y) -> String.format("%s,%s", x, y)) + .orElse(""); + return String.format( + "{\"numRecords\":%d,\"minValues\":{%s},\"maxValues\":{%s},\"nullCount\":{%s}}", + recordCount, minValues, maxValues, nullCount); } } diff --git a/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java b/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java index 640c13547..971c6bd04 100644 --- a/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java +++ b/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java @@ -18,16 +18,21 @@ public IcebergTableLoader(IcebergCatalogHandler icebergCatalogHandler) { public IcebergSharedTable loadTable(SharedTable sharedTable) { if (sharedTable.internalTable().properties() instanceof InternalTable.IcebergTableProperties) { var metastore = getMetastore(sharedTable.internalTable()); + var storage = sharedTable.internalTable().provider().storage(); var tableId = getTableIdentifier(sharedTable.internalTable()); if (metastore.type() == MetastoreType.GLUE) { - return IcebergSharedTable.of(icebergCatalogHandler.loadTableWithGlueCatalog( - metastore, sharedTable.internalTable().provider().storage(), tableId)); + return IcebergSharedTable.of( + icebergCatalogHandler.loadTableWithGlueCatalog( + metastore, sharedTable.internalTable().provider().storage(), tableId), + sharedTable); } else if (metastore.type() == MetastoreType.HADOOP) { - return IcebergSharedTable.of(icebergCatalogHandler.loadTableWithHadoopCatalog( - metastore, sharedTable.internalTable().provider().storage(), tableId)); + return IcebergSharedTable.of( + icebergCatalogHandler.loadTableWithHadoopCatalog( + metastore, sharedTable.internalTable().provider().storage(), tableId), + sharedTable); } else { - throw new RuntimeException(String.format( - "Metastore type: [%s] not compatible with Iceberg table", metastore.type())); + throw new RuntimeException( + String.format("Unsupported metastore type: [%s]", metastore.type())); } } else { throw new IllegalArgumentException( From 32d9b6e634ea1849ffaa3fbb34dbb527912bf485 Mon Sep 17 00:00:00 2001 From: Marco Scalzo Date: Tue, 16 Apr 2024 19:01:02 +0200 Subject: [PATCH 2/6] add iceberg file stats builder --- .../api/deltasharing/SampleTables.java | 28 +++--- .../main/java/io/whitefox/core/FileStats.java | 24 +++--- .../core/IcebergFileStatsBuilder.java | 61 +++++++++++++ .../IcebergFileStatsBuilderException.java | 17 ++++ .../java/io/whitefox/core/PredicateUtils.java | 6 +- .../core/services/IcebergSharedTable.java | 86 ++++++++----------- .../core/services/IcebergTableLoader.java | 12 +-- 7 files changed, 148 insertions(+), 86 deletions(-) create mode 100644 server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilder.java create mode 100644 server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilderException.java diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java b/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java index 654761659..133d52c83 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/SampleTables.java @@ -95,7 +95,7 @@ public static StorageManager createStorageManager() { .partitionValues(Map.of()) .size(419L) .stats( - "{\"numRecords\":1,\"minValues\":{\"1\":50331648},\"maxValues\":{\"1\":50331648},\"nullCount\":{\"1\":0}}") + "{\"numRecords\":1,\"minValues\":{\"id\":3},\"maxValues\":{\"id\":3},\"nullCount\":{\"id\":0}}") .version(1L) .timestamp(1705667209813L)), new FileObjectWithoutPresignedUrl() @@ -103,23 +103,23 @@ public static StorageManager createStorageManager() { .partitionValues(Map.of()) .size(419L) .stats( - "{\"numRecords\":1,\"minValues\":{\"1\":67108864},\"maxValues\":{\"1\":67108864},\"nullCount\":{\"1\":0}}") + "{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}") .version(1L) .timestamp(1705667209813L)), new FileObjectWithoutPresignedUrl() ._file(new FileObjectFileWithoutPresignedUrl() .partitionValues(Map.of()) - .size(418L) + .size(419L) .stats( - "{\"numRecords\":1,\"minValues\":{\"1\":0},\"maxValues\":{\"1\":0},\"nullCount\":{\"1\":0}}") + "{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}") .version(1L) .timestamp(1705667209813L)), new FileObjectWithoutPresignedUrl() ._file(new FileObjectFileWithoutPresignedUrl() .partitionValues(Map.of()) - .size(419L) + .size(418L) .stats( - "{\"numRecords\":1,\"minValues\":{\"1\":33554432},\"maxValues\":{\"1\":33554432},\"nullCount\":{\"1\":0}}") + "{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0}}") .version(1L) .timestamp(1705667209813L)), new FileObjectWithoutPresignedUrl() @@ -127,7 +127,7 @@ public static StorageManager createStorageManager() { .partitionValues(Map.of()) .size(419L) .stats( - "{\"numRecords\":1,\"minValues\":{\"1\":16777216},\"maxValues\":{\"1\":16777216},\"nullCount\":{\"1\":0}}") + "{\"numRecords\":1,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":0}}") .version(1L) .timestamp(1705667209813L))); @@ -276,7 +276,7 @@ public static StorageManager createStorageManager() { .partitionValues(Map.of()) .size(419L) .stats( - "{\"numRecords\":1,\"minValues\":{\"1\":50331648},\"maxValues\":{\"1\":50331648},\"nullCount\":{\"1\":0}}") + "{\"numRecords\":1,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":0}}") .version(1L) .timestamp(1705948389052L)), new FileObjectWithoutPresignedUrl() @@ -284,23 +284,23 @@ public static StorageManager createStorageManager() { .partitionValues(Map.of()) .size(419L) .stats( - "{\"numRecords\":1,\"minValues\":{\"1\":67108864},\"maxValues\":{\"1\":67108864},\"nullCount\":{\"1\":0}}") + "{\"numRecords\":1,\"minValues\":{\"id\":3},\"maxValues\":{\"id\":3},\"nullCount\":{\"id\":0}}") .version(1L) .timestamp(1705948389052L)), new FileObjectWithoutPresignedUrl() ._file(new FileObjectFileWithoutPresignedUrl() .partitionValues(Map.of()) - .size(418L) + .size(419L) .stats( - "{\"numRecords\":1,\"minValues\":{\"1\":0},\"maxValues\":{\"1\":0},\"nullCount\":{\"1\":0}}") + "{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}") .version(1L) .timestamp(1705948389052L)), new FileObjectWithoutPresignedUrl() ._file(new FileObjectFileWithoutPresignedUrl() .partitionValues(Map.of()) - .size(419L) + .size(418L) .stats( - "{\"numRecords\":1,\"minValues\":{\"1\":33554432},\"maxValues\":{\"1\":33554432},\"nullCount\":{\"1\":0}}") + "{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0}}") .version(1L) .timestamp(1705948389052L)), new FileObjectWithoutPresignedUrl() @@ -308,7 +308,7 @@ public static StorageManager createStorageManager() { .partitionValues(Map.of()) .size(419L) .stats( - "{\"numRecords\":1,\"minValues\":{\"1\":16777216},\"maxValues\":{\"1\":16777216},\"nullCount\":{\"1\":0}}") + "{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}") .version(1L) .timestamp(1705948389052L))); public static final Set s3DeltaTable1FilesWithoutPresignedUrl = diff --git a/server/core/src/main/java/io/whitefox/core/FileStats.java b/server/core/src/main/java/io/whitefox/core/FileStats.java index 2c5b648ae..5be1b1200 100644 --- a/server/core/src/main/java/io/whitefox/core/FileStats.java +++ b/server/core/src/main/java/io/whitefox/core/FileStats.java @@ -6,42 +6,42 @@ public class FileStats { // {"numRecords":1,"minValues":{"id":0},"maxValues":{"id":0},"nullCount":{"id":0}} @JsonProperty("numRecords") - String numRecords; + Long numRecords; @JsonProperty("minValues") - Map minValues; + Map minValues; @JsonProperty("maxValues") - Map maxValues; + Map maxValues; @JsonProperty("nullCount") - Map nullCount; + Map nullCount; public FileStats() { super(); } - public String getNumRecords() { + public Long getNumRecords() { return numRecords; } - public Map getMinValues() { + public Map getMinValues() { return minValues; } - public Map getMaxValues() { + public Map getMaxValues() { return maxValues; } - public Map getNullCount() { + public Map getNullCount() { return nullCount; } public FileStats( - String numRecords, - Map minValues, - Map maxValues, - Map nullCount) { + Long numRecords, + Map minValues, + Map maxValues, + Map nullCount) { this.numRecords = numRecords; this.minValues = minValues; this.maxValues = maxValues; diff --git a/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilder.java b/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilder.java new file mode 100644 index 000000000..bd30f4b65 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilder.java @@ -0,0 +1,61 @@ +package io.whitefox.core; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectWriter; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; + +public class IcebergFileStatsBuilder { + + private final ObjectWriter objectWriter; + + public IcebergFileStatsBuilder(ObjectWriter objectWriter) { + this.objectWriter = objectWriter; + } + + public String buildStats( + Schema schema, + Long numRecords, + Map minValues, + Map maxValues, + Map nullCount) + throws IcebergFileStatsBuilderException { + try { + return objectWriter.writeValueAsString(new FileStats( + numRecords, + buildValuesMap(minValues, schema), + buildValuesMap(maxValues, schema), + nullCount.entrySet().stream() + .collect( + Collectors.toMap(e -> schema.findColumnName(e.getKey()), Map.Entry::getValue)))); + } catch (JsonProcessingException e) { + throw new IcebergFileStatsBuilderException(e); + } + } + + public Map buildPartitionValues( + List partitionFields, StructLike partitionValues) { + var map = new HashMap(); + for (int i = 0; i < partitionFields.size(); i++) { + Types.NestedField field = partitionFields.get(i); + map.put( + field.name(), + partitionValues.get(i, field.type().typeId().javaClass()).toString()); + } + return map; + } + + private Map buildValuesMap(Map map, Schema schema) { + return map.entrySet().stream() + .collect(Collectors.toMap( + e -> schema.findColumnName(e.getKey()), + e -> Conversions.fromByteBuffer(schema.findType(e.getKey()), e.getValue()))); + } +} diff --git a/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilderException.java b/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilderException.java new file mode 100644 index 000000000..997d50233 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilderException.java @@ -0,0 +1,17 @@ +package io.whitefox.core; + +import java.util.Arrays; + +public class IcebergFileStatsBuilderException extends RuntimeException { + private final Exception cause; + + public IcebergFileStatsBuilderException(Exception cause) { + this.cause = cause; + } + + @Override + public String getMessage() { + return "Building of Iceberg file statistics failed due to: " + cause.getMessage() + + "\n Stack trace: " + Arrays.toString(cause.getStackTrace()); + } +} diff --git a/server/core/src/main/java/io/whitefox/core/PredicateUtils.java b/server/core/src/main/java/io/whitefox/core/PredicateUtils.java index c1945b971..cd59f26c0 100644 --- a/server/core/src/main/java/io/whitefox/core/PredicateUtils.java +++ b/server/core/src/main/java/io/whitefox/core/PredicateUtils.java @@ -113,11 +113,11 @@ public static EvalContext createEvalContext(AddFile file) throws PredicateParsin try { var fileStats = objectMapper.readValue(statsString, FileStats.class); - var maxValues = fileStats.maxValues; + var maxValues = fileStats.getMaxValues(); var mappedMinMaxPairs = new java.util.HashMap>(); fileStats.getMinValues().forEach((minK, minV) -> { - String maxV = maxValues.get(minK); - Pair minMaxPair = Pair.of(minV, maxV); + String maxV = String.valueOf(maxValues.get(minK)); + Pair minMaxPair = Pair.of(String.valueOf(minV), maxV); mappedMinMaxPairs.put(minK, minMaxPair); }); return new EvalContext(partitionValues, mappedMinMaxPairs); diff --git a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java index 2bfb813eb..b351459d5 100644 --- a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java @@ -1,14 +1,8 @@ package io.whitefox.core.services; import io.whitefox.core.*; -import io.whitefox.core.Metadata; -import io.whitefox.core.ReadTableRequest; -import io.whitefox.core.ReadTableResultToBeSigned; -import io.whitefox.core.TableSchema; import io.whitefox.core.services.capabilities.ResponseFormat; -import java.nio.ByteBuffer; import java.sql.Timestamp; -import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -23,27 +17,44 @@ public class IcebergSharedTable implements InternalSharedTable { private final TableSchemaConverter tableSchemaConverter; private final SharedTable tableDetails; private final FileIOFactory fileIOFactory; + private final IcebergFileStatsBuilder icebergFileStatsBuilder; private IcebergSharedTable( Table icebergTable, TableSchemaConverter tableSchemaConverter, SharedTable tableDetails, - FileIOFactory fileIOFactory) { + FileIOFactory fileIOFactory, + IcebergFileStatsBuilder icebergFileStatsBuilder) { this.icebergTable = icebergTable; this.tableSchemaConverter = tableSchemaConverter; this.tableDetails = tableDetails; this.fileIOFactory = fileIOFactory; + this.icebergFileStatsBuilder = icebergFileStatsBuilder; } public static IcebergSharedTable of( - Table icebergTable, SharedTable tableDetails, TableSchemaConverter tableSchemaConverter) { + Table icebergTable, + SharedTable tableDetails, + TableSchemaConverter tableSchemaConverter, + IcebergFileStatsBuilder icebergFileStatsBuilder) { return new IcebergSharedTable( - icebergTable, tableSchemaConverter, tableDetails, new FileIOFactoryImpl()); + icebergTable, + tableSchemaConverter, + tableDetails, + new FileIOFactoryImpl(), + icebergFileStatsBuilder); } - public static IcebergSharedTable of(Table icebergTable, SharedTable tableDetails) { + public static IcebergSharedTable of( + Table icebergTable, + SharedTable tableDetails, + IcebergFileStatsBuilder icebergFileStatsBuilder) { return new IcebergSharedTable( - icebergTable, new TableSchemaConverter(), tableDetails, new FileIOFactoryImpl()); + icebergTable, + new TableSchemaConverter(), + tableDetails, + new FileIOFactoryImpl(), + icebergFileStatsBuilder); } public Optional getMetadata(Optional startingTimestamp) { @@ -114,48 +125,21 @@ public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) { new Protocol(Optional.of(1)), getMetadataFromSnapshot(snapshot), StreamSupport.stream(snapshot.addedDataFiles(s3FileIO).spliterator(), false) - .map( - dataFile -> new TableFileToBeSigned( - dataFile.path().toString(), - dataFile.fileSizeInBytes(), - snapshot.sequenceNumber(), - Optional.of(snapshot.timestampMillis()), - buildStats( - dataFile.recordCount(), - dataFile.lowerBounds(), - dataFile.upperBounds(), - dataFile.nullValueCounts()), // TODO understand how to build stats - Map.of()) // TODO understand how to retrieve partition values - ) + .map(dataFile -> new TableFileToBeSigned( + dataFile.path().toString(), + dataFile.fileSizeInBytes(), + snapshot.sequenceNumber(), + Optional.of(snapshot.timestampMillis()), + icebergFileStatsBuilder.buildStats( + icebergTable.schema(), + dataFile.recordCount(), + dataFile.lowerBounds(), + dataFile.upperBounds(), + dataFile.nullValueCounts()), + icebergFileStatsBuilder.buildPartitionValues( + icebergTable.spec().partitionType().fields(), dataFile.partition()))) .collect(Collectors.toList()), snapshot.sequenceNumber()); } } - - // TODO: find a way to handle the ByteBuffer values in a general way - // TODO: find a way to retrieve the column names (to be consistent with delta) - // TODO: factor out this code in a dedicated IcebergSharedTableStatsBuilder utility class - private String buildStats( - long recordCount, - Map lowerBounds, - Map upperBounds, - Map nullValueCounts) { - var minValues = lowerBounds.entrySet().stream() - .map(e -> String.format( - "\"%s\":%s", e.getKey().toString(), e.getValue().asIntBuffer().get())) - .reduce((x, y) -> String.format("%s,%s", x, y)) - .orElse(""); - var maxValues = upperBounds.entrySet().stream() - .map(e -> String.format( - "\"%s\":%s", e.getKey().toString(), e.getValue().asIntBuffer().get())) - .reduce((x, y) -> String.format("%s,%s", x, y)) - .orElse(""); - var nullCount = nullValueCounts.entrySet().stream() - .map(e -> String.format("\"%s\":%d", e.getKey().toString(), e.getValue())) - .reduce((x, y) -> String.format("%s,%s", x, y)) - .orElse(""); - return String.format( - "{\"numRecords\":%d,\"minValues\":{%s},\"maxValues\":{%s},\"nullCount\":{%s}}", - recordCount, minValues, maxValues, nullCount); - } } diff --git a/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java b/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java index 971c6bd04..a9d285af4 100644 --- a/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java +++ b/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java @@ -1,9 +1,7 @@ package io.whitefox.core.services; -import io.whitefox.core.InternalTable; -import io.whitefox.core.Metastore; -import io.whitefox.core.MetastoreType; -import io.whitefox.core.SharedTable; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.whitefox.core.*; import org.apache.iceberg.catalog.TableIdentifier; public class IcebergTableLoader implements TableLoader { @@ -24,12 +22,14 @@ public IcebergSharedTable loadTable(SharedTable sharedTable) { return IcebergSharedTable.of( icebergCatalogHandler.loadTableWithGlueCatalog( metastore, sharedTable.internalTable().provider().storage(), tableId), - sharedTable); + sharedTable, + new IcebergFileStatsBuilder(new ObjectMapper().writer())); } else if (metastore.type() == MetastoreType.HADOOP) { return IcebergSharedTable.of( icebergCatalogHandler.loadTableWithHadoopCatalog( metastore, sharedTable.internalTable().provider().storage(), tableId), - sharedTable); + sharedTable, + new IcebergFileStatsBuilder(new ObjectMapper().writer())); } else { throw new RuntimeException( String.format("Unsupported metastore type: [%s]", metastore.type())); From a6e9eae53a564a5f6029788f4510e15f1a1b119a Mon Sep 17 00:00:00 2001 From: Marco Scalzo Date: Wed, 17 Apr 2024 10:05:40 +0200 Subject: [PATCH 3/6] add IcebergPartitionValuesBuilder --- .../core/IcebergFileStatsBuilder.java | 16 -------------- .../core/IcebergPartitionValuesBuilder.java | 22 +++++++++++++++++++ .../core/services/IcebergSharedTable.java | 19 +++++++++++----- .../core/services/IcebergTableLoader.java | 6 +++-- 4 files changed, 39 insertions(+), 24 deletions(-) create mode 100644 server/core/src/main/java/io/whitefox/core/IcebergPartitionValuesBuilder.java diff --git a/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilder.java b/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilder.java index bd30f4b65..95bf204f4 100644 --- a/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilder.java +++ b/server/core/src/main/java/io/whitefox/core/IcebergFileStatsBuilder.java @@ -3,14 +3,10 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectWriter; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; import org.apache.iceberg.types.Conversions; -import org.apache.iceberg.types.Types; public class IcebergFileStatsBuilder { @@ -40,18 +36,6 @@ public String buildStats( } } - public Map buildPartitionValues( - List partitionFields, StructLike partitionValues) { - var map = new HashMap(); - for (int i = 0; i < partitionFields.size(); i++) { - Types.NestedField field = partitionFields.get(i); - map.put( - field.name(), - partitionValues.get(i, field.type().typeId().javaClass()).toString()); - } - return map; - } - private Map buildValuesMap(Map map, Schema schema) { return map.entrySet().stream() .collect(Collectors.toMap( diff --git a/server/core/src/main/java/io/whitefox/core/IcebergPartitionValuesBuilder.java b/server/core/src/main/java/io/whitefox/core/IcebergPartitionValuesBuilder.java new file mode 100644 index 000000000..e1032b1c7 --- /dev/null +++ b/server/core/src/main/java/io/whitefox/core/IcebergPartitionValuesBuilder.java @@ -0,0 +1,22 @@ +package io.whitefox.core; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Types; + +public class IcebergPartitionValuesBuilder { + + public Map buildPartitionValues( + List partitionFields, StructLike partitionValues) { + var map = new HashMap(); + for (int i = 0; i < partitionFields.size(); i++) { + Types.NestedField field = partitionFields.get(i); + map.put( + field.name(), + partitionValues.get(i, field.type().typeId().javaClass()).toString()); + } + return map; + } +} diff --git a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java index b351459d5..8a8d87197 100644 --- a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java @@ -18,43 +18,50 @@ public class IcebergSharedTable implements InternalSharedTable { private final SharedTable tableDetails; private final FileIOFactory fileIOFactory; private final IcebergFileStatsBuilder icebergFileStatsBuilder; + private final IcebergPartitionValuesBuilder icebergPartitionValuesBuilder; private IcebergSharedTable( Table icebergTable, TableSchemaConverter tableSchemaConverter, SharedTable tableDetails, FileIOFactory fileIOFactory, - IcebergFileStatsBuilder icebergFileStatsBuilder) { + IcebergFileStatsBuilder icebergFileStatsBuilder, + IcebergPartitionValuesBuilder icebergPartitionValuesBuilder) { this.icebergTable = icebergTable; this.tableSchemaConverter = tableSchemaConverter; this.tableDetails = tableDetails; this.fileIOFactory = fileIOFactory; this.icebergFileStatsBuilder = icebergFileStatsBuilder; + this.icebergPartitionValuesBuilder = icebergPartitionValuesBuilder; } public static IcebergSharedTable of( Table icebergTable, SharedTable tableDetails, TableSchemaConverter tableSchemaConverter, - IcebergFileStatsBuilder icebergFileStatsBuilder) { + IcebergFileStatsBuilder icebergFileStatsBuilder, + IcebergPartitionValuesBuilder icebergPartitionValuesBuilder) { return new IcebergSharedTable( icebergTable, tableSchemaConverter, tableDetails, new FileIOFactoryImpl(), - icebergFileStatsBuilder); + icebergFileStatsBuilder, + icebergPartitionValuesBuilder); } public static IcebergSharedTable of( Table icebergTable, SharedTable tableDetails, - IcebergFileStatsBuilder icebergFileStatsBuilder) { + IcebergFileStatsBuilder icebergFileStatsBuilder, + IcebergPartitionValuesBuilder icebergPartitionValuesBuilder) { return new IcebergSharedTable( icebergTable, new TableSchemaConverter(), tableDetails, new FileIOFactoryImpl(), - icebergFileStatsBuilder); + icebergFileStatsBuilder, + icebergPartitionValuesBuilder); } public Optional getMetadata(Optional startingTimestamp) { @@ -136,7 +143,7 @@ public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) { dataFile.lowerBounds(), dataFile.upperBounds(), dataFile.nullValueCounts()), - icebergFileStatsBuilder.buildPartitionValues( + icebergPartitionValuesBuilder.buildPartitionValues( icebergTable.spec().partitionType().fields(), dataFile.partition()))) .collect(Collectors.toList()), snapshot.sequenceNumber()); diff --git a/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java b/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java index a9d285af4..d84e870d1 100644 --- a/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java +++ b/server/core/src/main/java/io/whitefox/core/services/IcebergTableLoader.java @@ -23,13 +23,15 @@ public IcebergSharedTable loadTable(SharedTable sharedTable) { icebergCatalogHandler.loadTableWithGlueCatalog( metastore, sharedTable.internalTable().provider().storage(), tableId), sharedTable, - new IcebergFileStatsBuilder(new ObjectMapper().writer())); + new IcebergFileStatsBuilder(new ObjectMapper().writer()), + new IcebergPartitionValuesBuilder()); } else if (metastore.type() == MetastoreType.HADOOP) { return IcebergSharedTable.of( icebergCatalogHandler.loadTableWithHadoopCatalog( metastore, sharedTable.internalTable().provider().storage(), tableId), sharedTable, - new IcebergFileStatsBuilder(new ObjectMapper().writer())); + new IcebergFileStatsBuilder(new ObjectMapper().writer()), + new IcebergPartitionValuesBuilder()); } else { throw new RuntimeException( String.format("Unsupported metastore type: [%s]", metastore.type())); From 3d7e4f5a5ea01bf08644263d638bc6dd90ef6122 Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Fri, 19 Apr 2024 11:31:55 +0200 Subject: [PATCH 4/6] Piggy back on iceberg to initialize a FileIO --- .../FileObjectFileWithoutPresignedUrl.java | 4 + .../model/FileObjectWithoutPresignedUrl.java | 4 + .../server/DeltaSharesApiImplTest.java | 24 +++++ .../io/whitefox/core/StorageProperties.java | 39 ++++++++ .../whitefox/core/services/FileIOFactory.java | 3 +- .../core/services/FileIOFactoryImpl.java | 88 +++++++++++-------- .../core/services/IcebergSharedTable.java | 9 +- .../whitefox/core/StoragePropertiesTest.java | 33 +++++++ .../core/services/FileIOFactoryImplTest.java | 59 +++++++++++++ .../java/io/whitefox/TestUtils.java | 2 +- 10 files changed, 220 insertions(+), 45 deletions(-) create mode 100644 server/core/src/test/java/io/whitefox/core/StoragePropertiesTest.java create mode 100644 server/core/src/test/java/io/whitefox/core/services/FileIOFactoryImplTest.java diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectFileWithoutPresignedUrl.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectFileWithoutPresignedUrl.java index 7cd55a5d6..7aa666a91 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectFileWithoutPresignedUrl.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectFileWithoutPresignedUrl.java @@ -6,6 +6,10 @@ import java.util.Map; import java.util.Objects; +/** + * This class is test-only and it's needed because we can't know in advance pre-signed urls, in this way we can + * easily run assertions on FileObjects that contain pre-signed urls ignoring the url. + */ public class FileObjectFileWithoutPresignedUrl { private Map partitionValues = new HashMap<>(); diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectWithoutPresignedUrl.java b/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectWithoutPresignedUrl.java index 560f0a558..7706a0f1f 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectWithoutPresignedUrl.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/model/FileObjectWithoutPresignedUrl.java @@ -3,6 +3,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Objects; +/** + * This class is test-only and it's needed because we can't know in advance pre-signed urls, in this way we can + * easily run assertions on FileObjects that contain pre-signed urls ignoring the url. + */ public class FileObjectWithoutPresignedUrl { private FileObjectFileWithoutPresignedUrl _file; diff --git a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java index b96de0599..6cb531d92 100644 --- a/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java +++ b/server/app/src/test/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImplTest.java @@ -215,6 +215,12 @@ public void deltaTableMetadata() throws IOException { objectMapper.reader().readValue(responseBodyLines[1], ParquetMetadata.class)); } + /** + * This test is disabled because we statically generate iceberg tables and then test against them. + * Unfortunately iceberg table metadata contains the files full path, not a relative one, therefore if we generate it + * on someone laptop it will not be able to find it during CI or on another person laptop. + * This code is still tested against s3 so we're still covered. + */ @Test @Disabled public void icebergTableVersion() { @@ -231,6 +237,12 @@ public void icebergTableVersion() { .header("Delta-Table-Version", "1"); } + /** + * This test is disabled because we statically generate iceberg tables and then test against them. + * Unfortunately iceberg table metadata contains the files full path, not a relative one, therefore if we generate it + * on someone laptop it will not be able to find it during CI or on another person laptop. + * This code is still tested against s3 so we're still covered. + */ @Test @Disabled public void icebergTableMetadata() throws IOException { @@ -553,6 +565,12 @@ public void queryTableByTs() throws IOException { assertEquals(7, responseBodyLines.length); } + /** + * This test is disabled because we statically generate iceberg tables and then test against them. + * Unfortunately iceberg table metadata contains the files full path, not a relative one, therefore if we generate it + * on someone laptop it will not be able to find it during CI or on another person laptop. + * This code is still tested against s3 so we're still covered. + */ @Disabled @Test public void queryIcebergTableCurrentVersion() throws IOException { @@ -597,6 +615,12 @@ public void queryIcebergTableCurrentVersion() throws IOException { assertEquals(localIcebergTableFilesToBeSigned, files); } + /** + * This test is disabled because we statically generate iceberg tables and then test against them. + * Unfortunately iceberg table metadata contains the files full path, not a relative one, therefore if we generate it + * on someone laptop it will not be able to find it during CI or on another person laptop. + * This code is still tested against s3 so we're still covered. + */ @Disabled @Test public void queryIcebergTableByVersion() throws IOException { diff --git a/server/core/src/main/java/io/whitefox/core/StorageProperties.java b/server/core/src/main/java/io/whitefox/core/StorageProperties.java index 8628753b7..7891af321 100644 --- a/server/core/src/main/java/io/whitefox/core/StorageProperties.java +++ b/server/core/src/main/java/io/whitefox/core/StorageProperties.java @@ -1,9 +1,14 @@ package io.whitefox.core; import io.whitefox.annotations.SkipCoverageGenerated; +import java.util.List; import java.util.Objects; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; public interface StorageProperties { + void validateTypeAndUri(StorageType storageType, String uri); + public static class S3Properties implements StorageProperties { private final AwsCredentials credentials; @@ -35,5 +40,39 @@ public String toString() { public AwsCredentials credentials() { return credentials; } + + private static final List s3Prefixes = List.of("s3://", "s3a://", "s3n://"); + + @Override + public void validateTypeAndUri(StorageType storageType, String uri) { + + if (storageType != StorageType.S3) { + throw new IllegalArgumentException( + String.format("Invalid storage type %s for S3Properties", storageType)); + } + if (s3Prefixes.stream().noneMatch(pre -> StringUtils.startsWith(uri, pre))) { + throw new IllegalArgumentException(String.format( + "s3 uri must start with any of %s but %s was provided", + String.join(", ", s3Prefixes), uri)); + } + } + } + + public static class LocalProperties implements StorageProperties { + @Override + public void validateTypeAndUri(StorageType storageType, String uri) { + if (storageType != StorageType.LOCAL) { + throw new IllegalArgumentException( + String.format("Invalid storage type %s for LocalProprties", storageType)); + } + if (!StringUtils.startsWith(uri, "file://")) { + throw new IllegalArgumentException( + String.format("local uri must start with file:// but %s was provided", uri)); + } + } + + public Configuration hadoopConf() { + return new Configuration(); + } } } diff --git a/server/core/src/main/java/io/whitefox/core/services/FileIOFactory.java b/server/core/src/main/java/io/whitefox/core/services/FileIOFactory.java index 5b1b84bbc..a7760097e 100644 --- a/server/core/src/main/java/io/whitefox/core/services/FileIOFactory.java +++ b/server/core/src/main/java/io/whitefox/core/services/FileIOFactory.java @@ -1,9 +1,8 @@ package io.whitefox.core.services; -import io.whitefox.core.Metastore; import io.whitefox.core.Storage; import org.apache.iceberg.io.FileIO; public interface FileIOFactory { - FileIO newFileIO(Storage storage, Metastore metastore); + FileIO newFileIO(Storage storage); } diff --git a/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java b/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java index 296390cc0..ea2c89db6 100644 --- a/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java +++ b/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java @@ -1,49 +1,67 @@ package io.whitefox.core.services; -import io.whitefox.core.*; +import io.whitefox.core.AwsCredentials; +import io.whitefox.core.Storage; +import io.whitefox.core.StorageProperties; import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.aws.s3.S3FileIO; -import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.aws.AwsClientProperties; +import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.apache.iceberg.io.FileIO; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; +import org.apache.iceberg.io.ResolvingFileIO; public class FileIOFactoryImpl implements FileIOFactory { @Override // TODO add tests to restore coverage from 0.70 to 0.72 - public FileIO newFileIO(Storage storage, Metastore metastore) { - if (metastore.type() == MetastoreType.GLUE) { - if (storage.properties() instanceof StorageProperties.S3Properties) { - AwsCredentials credentials = - ((StorageProperties.S3Properties) storage.properties()).credentials(); - if (credentials instanceof AwsCredentials.SimpleAwsCredentials simpleAwsCredentials) { - var s3FileIO = new S3FileIO(() -> S3Client.builder() - .region(Region.of(simpleAwsCredentials.region())) - .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create( - simpleAwsCredentials.awsAccessKeyId(), - simpleAwsCredentials.awsSecretAccessKey()))) - .build()); - s3FileIO.initialize(Map.of()); - return s3FileIO; + public FileIO newFileIO(Storage storage) { + storage.properties().validateTypeAndUri(storage.type(), storage.uri()); + var propsAndConfs = buildPropsFromStorage(storage); + String className = resolveClassNameFromStorage(storage); + return CatalogUtil.loadFileIO(className, propsAndConfs.props(), propsAndConfs.conf()); + } + + private static String resolveClassNameFromStorage(Storage storage) { + String className = null; + try (ResolvingFileIO rfi = new ResolvingFileIO()) { + className = rfi.ioClass(storage.uri()).getCanonicalName(); + } + return className; + } + + private PropsAndConf buildPropsFromStorage(Storage storage) { + switch (storage.type()) { + case S3: + if (storage.properties() instanceof StorageProperties.S3Properties) { + AwsCredentials credentials = + ((StorageProperties.S3Properties) storage.properties()).credentials(); + if (credentials instanceof AwsCredentials.SimpleAwsCredentials simpleAwsCredentials) { + return new PropsAndConf( + Map.of( + S3FileIOProperties.ACCESS_KEY_ID, simpleAwsCredentials.awsAccessKeyId(), + S3FileIOProperties.SECRET_ACCESS_KEY, simpleAwsCredentials.awsSecretAccessKey(), + AwsClientProperties.CLIENT_REGION, simpleAwsCredentials.region()), + null); + } else { + throw new RuntimeException("Missing aws credentials"); // TODO better message + } } else { - throw new RuntimeException("Missing aws credentials"); + throw new RuntimeException("Missing s3 properties"); // TODO better message } - } else { - throw new RuntimeException("Missing s3 properties"); - } - } else if (metastore.type() == MetastoreType.HADOOP) { - var hadoopFileIO = new HadoopFileIO(new Configuration()); - hadoopFileIO.initialize(Map.of( - CatalogProperties.WAREHOUSE_LOCATION, - ((MetastoreProperties.HadoopMetastoreProperties) metastore.properties()).location())); - return hadoopFileIO; - } else - throw new IllegalArgumentException( - String.format("Unsupported metastore type: [%s]", metastore.type())); + case LOCAL: + if (storage.properties() instanceof StorageProperties.LocalProperties) { + return new PropsAndConf( + Map.of(), ((StorageProperties.LocalProperties) storage.properties()).hadoopConf()); + } else { + throw new RuntimeException("Missing local properties"); // TODO better message + } + + default: + throw new IllegalArgumentException( + String.format("Unsupported storage type: [%s]", storage.type())); + } } + + private record PropsAndConf(Map props, Configuration conf) {} } diff --git a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java index 8a8d87197..caa633c90 100644 --- a/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/IcebergSharedTable.java @@ -121,13 +121,8 @@ public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) { } else { throw new IllegalArgumentException("Unknown ReadTableRequest type: " + readTableRequest); } - try (var s3FileIO = fileIOFactory.newFileIO( - tableDetails.internalTable().provider().storage(), - tableDetails - .internalTable() - .provider() - .metastore() - .orElseThrow(() -> new RuntimeException("metastore not found")))) { + try (var s3FileIO = + fileIOFactory.newFileIO(tableDetails.internalTable().provider().storage())) { return new ReadTableResultToBeSigned( new Protocol(Optional.of(1)), getMetadataFromSnapshot(snapshot), diff --git a/server/core/src/test/java/io/whitefox/core/StoragePropertiesTest.java b/server/core/src/test/java/io/whitefox/core/StoragePropertiesTest.java new file mode 100644 index 000000000..82fe45287 --- /dev/null +++ b/server/core/src/test/java/io/whitefox/core/StoragePropertiesTest.java @@ -0,0 +1,33 @@ +package io.whitefox.core; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class StoragePropertiesTest { + private final StorageProperties.S3Properties s3props = + new StorageProperties.S3Properties(new AwsCredentials.SimpleAwsCredentials("a", "b", "c")); + + @Test + void testValidS3Properties() { + Assertions.assertDoesNotThrow( + () -> s3props.validateTypeAndUri(StorageType.S3, "s3://pippo-bucket")); + Assertions.assertDoesNotThrow( + () -> s3props.validateTypeAndUri(StorageType.S3, "s3n://pippo-bucket")); + Assertions.assertDoesNotThrow( + () -> s3props.validateTypeAndUri(StorageType.S3, "s3a://pippo-bucket")); + } + + @Test + void failIfWrongUri() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> s3props.validateTypeAndUri(StorageType.S3, "file:///pippo-bucket")); + } + + @Test + void failIfType() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> s3props.validateTypeAndUri(StorageType.LOCAL, "file:///pippo-bucket")); + } +} diff --git a/server/core/src/test/java/io/whitefox/core/services/FileIOFactoryImplTest.java b/server/core/src/test/java/io/whitefox/core/services/FileIOFactoryImplTest.java new file mode 100644 index 000000000..9f37bb984 --- /dev/null +++ b/server/core/src/test/java/io/whitefox/core/services/FileIOFactoryImplTest.java @@ -0,0 +1,59 @@ +package io.whitefox.core.services; + +import io.whitefox.core.*; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Optional; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class FileIOFactoryImplTest { + private final FileIOFactoryImpl factory = new FileIOFactoryImpl(); + + @Test + void buildS3FileIO() { + var storage = new Storage( + "name", + Optional.empty(), + new Principal("Mr. Fox"), + StorageType.S3, + Optional.empty(), + "s3a://my-bucket", + 0L, + new Principal("Mr. Fox"), + 0L, + new Principal("Mr. Fox"), + new StorageProperties.S3Properties( + new AwsCredentials.SimpleAwsCredentials("a", "b", "us-east1"))); + try (var fileIO = factory.newFileIO(storage)) { + Assertions.assertInstanceOf(S3FileIO.class, fileIO); + } + } + + @Test + void buildLocalFileIO() throws IOException { + var storage = new Storage( + "name", + Optional.empty(), + new Principal("Mr. Fox"), + StorageType.LOCAL, + Optional.empty(), + "file:///tmp", + 0L, + new Principal("Mr. Fox"), + 0L, + new Principal("Mr. Fox"), + new StorageProperties.LocalProperties()); + try (var fileIO = factory.newFileIO(storage)) { + Assertions.assertInstanceOf(HadoopFileIO.class, fileIO); + Assertions.assertDoesNotThrow(() -> { + var of = fileIO.newOutputFile(Files.createTempFile("", "").toString()); + try (var f = of.createOrOverwrite()) { + f.write(1); + } + }); + } + } +} diff --git a/server/core/src/testFixtures/java/io/whitefox/TestUtils.java b/server/core/src/testFixtures/java/io/whitefox/TestUtils.java index 1603f382e..fe458cba6 100644 --- a/server/core/src/testFixtures/java/io/whitefox/TestUtils.java +++ b/server/core/src/testFixtures/java/io/whitefox/TestUtils.java @@ -56,7 +56,7 @@ public static Storage getS3Storage(Principal principal, S3TestConfig s3TestConfi principal, StorageType.S3, Optional.empty(), - "uri", + "s3://uri", 0L, principal, 0L, From 513e3ee20b9df9f360a3818cc6092ade24486073 Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Fri, 19 Apr 2024 11:39:44 +0200 Subject: [PATCH 5/6] Disable test on windows and reset coverage --- server/core/build.gradle.kts | 2 +- .../java/io/whitefox/core/services/FileIOFactoryImplTest.java | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/server/core/build.gradle.kts b/server/core/build.gradle.kts index 0ec801120..16cfff076 100644 --- a/server/core/build.gradle.kts +++ b/server/core/build.gradle.kts @@ -79,7 +79,7 @@ tasks.jacocoTestCoverageVerification { violationRules { rule { limit { - minimum = BigDecimal.valueOf(0.70) + minimum = BigDecimal.valueOf(0.72) } } } diff --git a/server/core/src/test/java/io/whitefox/core/services/FileIOFactoryImplTest.java b/server/core/src/test/java/io/whitefox/core/services/FileIOFactoryImplTest.java index 9f37bb984..957f2a796 100644 --- a/server/core/src/test/java/io/whitefox/core/services/FileIOFactoryImplTest.java +++ b/server/core/src/test/java/io/whitefox/core/services/FileIOFactoryImplTest.java @@ -8,6 +8,8 @@ import org.apache.iceberg.hadoop.HadoopFileIO; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; public class FileIOFactoryImplTest { private final FileIOFactoryImpl factory = new FileIOFactoryImpl(); @@ -33,6 +35,7 @@ void buildS3FileIO() { } @Test + @DisabledOnOs(OS.WINDOWS) void buildLocalFileIO() throws IOException { var storage = new Storage( "name", From 798420ba8b0240905df119d4bf1e3dc3372b1797 Mon Sep 17 00:00:00 2001 From: Antonio Murgia Date: Fri, 19 Apr 2024 11:46:01 +0200 Subject: [PATCH 6/6] Remove TODO --- .../main/java/io/whitefox/core/services/FileIOFactoryImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java b/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java index ea2c89db6..961779bf6 100644 --- a/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java +++ b/server/core/src/main/java/io/whitefox/core/services/FileIOFactoryImpl.java @@ -14,7 +14,6 @@ public class FileIOFactoryImpl implements FileIOFactory { @Override - // TODO add tests to restore coverage from 0.70 to 0.72 public FileIO newFileIO(Storage storage) { storage.properties().validateTypeAndUri(storage.type(), storage.uri()); var propsAndConfs = buildPropsFromStorage(storage);