From 8553226fa1d8d97e0f458cf871689079d07c3a3d Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Tue, 12 Sep 2023 12:07:31 +0200 Subject: [PATCH] Core: Support view metadata compression --- .../iceberg/view/BaseViewOperations.java | 9 ++- .../iceberg/view/ViewMetadataParser.java | 12 +++- .../apache/iceberg/view/ViewProperties.java | 3 + .../iceberg/view/TestViewMetadataParser.java | 67 +++++++++++++++++++ 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java index f7270b9a35ed..766d217346e0 100644 --- a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java +++ b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchViewException; @@ -147,8 +148,14 @@ protected String writeNewMetadataIfRequired(ViewMetadata metadata) { } private String newMetadataFilePath(ViewMetadata metadata, int newVersion) { + String codecName = + metadata + .properties() + .getOrDefault( + ViewProperties.METADATA_COMPRESSION, ViewProperties.METADATA_COMPRESSION_DEFAULT); + String fileExtension = TableMetadataParser.getFileExtension(codecName); return metadataFileLocation( - metadata, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), ".metadata.json")); + metadata, String.format("%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension)); } private String metadataFileLocation(ViewMetadata metadata, String filename) { diff --git a/core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java b/core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java index 7a29c87bad9c..efce2b725ee6 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java @@ -28,8 +28,11 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.TableMetadataParser.Codec; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -163,7 +166,9 @@ public static void write(ViewMetadata metadata, OutputFile outputFile) { } public static ViewMetadata read(InputFile file) { - try (InputStream is = file.newStream()) { + Codec codec = Codec.fromFileName(file.location()); + try (InputStream is = + codec == Codec.GZIP ? new GZIPInputStream(file.newStream()) : file.newStream()) { return fromJson(file.location(), JsonUtil.mapper().readValue(is, JsonNode.class)); } catch (IOException e) { throw new UncheckedIOException(String.format("Failed to read json file: %s", file), e); @@ -172,8 +177,11 @@ public static ViewMetadata read(InputFile file) { private static void internalWrite( ViewMetadata metadata, OutputFile outputFile, boolean overwrite) { + boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP; OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create(); - try (OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8)) { + try (OutputStreamWriter writer = + new OutputStreamWriter( + isGzip ? new GZIPOutputStream(stream) : stream, StandardCharsets.UTF_8)) { JsonGenerator generator = JsonUtil.factory().createGenerator(writer); generator.useDefaultPrettyPrinter(); toJson(metadata, generator); diff --git a/core/src/main/java/org/apache/iceberg/view/ViewProperties.java b/core/src/main/java/org/apache/iceberg/view/ViewProperties.java index 07cd3cbf6567..b356db7ce040 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewProperties.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewProperties.java @@ -23,5 +23,8 @@ public class ViewProperties { public static final String VERSION_HISTORY_SIZE = "version.history.num-entries"; public static final int VERSION_HISTORY_SIZE_DEFAULT = 10; + public static final String METADATA_COMPRESSION = "write.metadata.compression-codec"; + public static final String METADATA_COMPRESSION_DEFAULT = "gzip"; + private ViewProperties() {} } diff --git a/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java b/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java index 267195c13350..d168eaf73126 100644 --- a/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java +++ b/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java @@ -18,18 +18,30 @@ */ package org.apache.iceberg.view; +import static org.apache.iceberg.TableMetadataParser.getFileExtension; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.zip.GZIPInputStream; +import java.util.zip.ZipException; import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadataParser.Codec; import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; public class TestViewMetadataParser { @@ -39,6 +51,8 @@ public class TestViewMetadataParser { Types.NestedField.required(2, "y", Types.LongType.get(), "comment"), Types.NestedField.required(3, "z", Types.LongType.get())); + @TempDir private Path tmp; + @Test public void nullAndEmptyCheck() { assertThatThrownBy(() -> ViewMetadataParser.fromJson((String) null)) @@ -308,4 +322,57 @@ public void replaceViewMetadataWithMultipleSQLsForDialect() throws Exception { assertThat(replaced.currentVersion()).isEqualTo(viewVersion); } + + @ParameterizedTest + @ValueSource(strings = {"none", "gzip"}) + public void metadataCompression(String codecName) throws IOException { + Codec codec = Codec.fromName(codecName); + String location = Paths.get(tmp.toString(), "v1" + getFileExtension(codec)).toString(); + OutputFile outputFile = org.apache.iceberg.Files.localOutput(location); + + Schema schema = new Schema(Types.NestedField.required(1, "x", Types.LongType.get())); + ViewVersion viewVersion = + ImmutableViewVersion.builder() + .schemaId(0) + .versionId(1) + .timestampMillis(23L) + .putSummary("operation", "create") + .defaultNamespace(Namespace.of("ns")) + .build(); + + ViewMetadata metadata = + ViewMetadata.buildFrom( + ViewMetadata.builder() + .setLocation(location) + .addSchema(schema) + .setProperties(ImmutableMap.of(ViewProperties.METADATA_COMPRESSION, codecName)) + .addVersion(viewVersion) + .setCurrentVersionId(1) + .build()) + .setMetadataLocation(outputFile.location()) + .build(); + + ViewMetadataParser.write(metadata, outputFile); + assertThat(Codec.GZIP == codec).isEqualTo(isCompressed(location)); + + ViewMetadata actualMetadata = + ViewMetadataParser.read(org.apache.iceberg.Files.localInput(location)); + + assertThat(actualMetadata) + .usingRecursiveComparison() + .ignoringFieldsOfTypes(Schema.class) + .isEqualTo(metadata); + } + + private boolean isCompressed(String path) throws IOException { + try (InputStream ignored = new GZIPInputStream(Files.newInputStream(new File(path).toPath()))) { + return true; + } catch (ZipException e) { + if (e.getMessage().equals("Not in GZIP format")) { + return false; + } else { + throw e; + } + } + } }