diff --git a/V4_Testing_Guide.md b/V4_Testing_Guide.md
new file mode 100644
index 000000000000..0a3a9e57d4cd
--- /dev/null
+++ b/V4_Testing_Guide.md
@@ -0,0 +1,124 @@
+# Testing V4 Iceberg with Spark
+
+## Build the Iceberg Spark runtime jar
+
+### Spark 4.1
+
+```bash
+git checkout v4-amt
+./gradlew :iceberg-spark:iceberg-spark-runtime-4.1_2.13:shadowJar
+```
+
+The jar is at:
+```
+spark/v4.1/spark-runtime/build/libs/iceberg-spark-runtime-4.1_2.13-1.11.0-SNAPSHOT.jar
+```
+
+### Spark 3.5
+
+```bash
+git checkout v4-amt
+./gradlew -DsparkVersions=3.5 :iceberg-spark:iceberg-spark-runtime-3.5_2.12:shadowJar
+```
+
+The jar is at:
+```
+spark/v3.5/spark-runtime/build/libs/iceberg-spark-runtime-3.5_2.12-1.11.0-SNAPSHOT.jar
+```
+
+## Download Spark
+
+### Spark 4.1.1
+
+```bash
+curl -L -o spark-4.1.1-bin-hadoop3.tgz \
+ https://archive.apache.org/dist/spark/spark-4.1.1/spark-4.1.1-bin-hadoop3.tgz
+tar xzf spark-4.1.1-bin-hadoop3.tgz
+```
+
+### Spark 3.5.8
+
+```bash
+curl -L -o spark-3.5.8-bin-hadoop3.tgz \
+ https://archive.apache.org/dist/spark/spark-3.5.8/spark-3.5.8-bin-hadoop3.tgz
+tar xzf spark-3.5.8-bin-hadoop3.tgz
+```
+
+## Start spark-sql
+
+### Spark 4.1
+
+```bash
+spark-4.1.1-bin-hadoop3/bin/spark-sql \
+ --jars /path/to/iceberg-spark-runtime-4.1_2.13-1.11.0-SNAPSHOT.jar \
+ --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
+ --conf spark.sql.catalog.local.type=hadoop \
+ --conf spark.sql.catalog.local.warehouse=file:///tmp/iceberg-warehouse
+```
+
+### Spark 3.5
+
+```bash
+spark-3.5.8-bin-hadoop3/bin/spark-sql \
+ --jars /path/to/iceberg-spark-runtime-3.5_2.12-1.11.0-SNAPSHOT.jar \
+ --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
+ --conf spark.sql.catalog.local.type=hadoop \
+ --conf spark.sql.catalog.local.warehouse=file:///tmp/iceberg-warehouse
+```
+
+## Create a v4 table and query it
+
+```sql
+CREATE TABLE local.default.test (id bigint, data string)
+ USING iceberg TBLPROPERTIES ('format-version' = '4');
+
+INSERT INTO local.default.test VALUES (1, 'a'), (2, 'b'), (3, 'c');
+
+SELECT * FROM local.default.test ORDER BY id;
+```
+
+## Inspect the metadata
+
+All paths in v4 metadata are stored as relative:
+
+```bash
+# metadata JSON -- manifest-list and metadata-log use relative paths
+python3 -m json.tool /tmp/iceberg-warehouse/default/test/metadata/v2.metadata.json
+
+# root manifest and leaf manifests are Parquet -- read with spark-sql
+# (replace the UUID with the actual filename)
+SELECT * FROM parquet.`file:///tmp/iceberg-warehouse/default/test/metadata/*-root-*.parquet`;
+SELECT * FROM parquet.`file:///tmp/iceberg-warehouse/default/test/metadata/*-m0.parquet`;
+```
+
+## Run automated V4 tests
+
+### Spark 4.1
+
+```bash
+./gradlew :iceberg-spark:iceberg-spark-4.1_2.13:test \
+ --tests "org.apache.iceberg.spark.source.TestV4ReadEndToEnd"
+```
+
+### Spark 3.5
+
+```bash
+./gradlew -DsparkVersions=3.5 :iceberg-spark:iceberg-spark-3.5_2.12:test \
+ --tests "org.apache.iceberg.spark.source.TestV4ReadEndToEnd"
+```
+
+## What's implemented
+
+- V4 Adaptive Metadata Tree: root manifest (Parquet) replaces Avro manifest list
+- Relative paths at all levels: metadata JSON, root manifest, leaf manifests
+- Metadata deletion vectors (inline bitmaps on tracking struct)
+- V4 scan path through ManifestExpander (bypasses ManifestGroup)
+- FastAppend write path (INSERT INTO)
+
+## Limitations
+
+- Only FastAppend is wired for v4 (INSERT INTO). Overwrites, deletes, and
+ compaction still use the v2/v3 path.
+- Data deletion vectors (colocated with data files) are not yet implemented.
+- Metadata deletion vectors are applied on read but there is no write path
+ that produces them yet.
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
index cbd372b7a4ba..b48a2aa82a7f 100644
--- a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmark.java
@@ -18,23 +18,15 @@
*/
package org.apache.iceberg;
-import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Types;
-import org.openjdk.jmh.annotations.AuxCounters;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -52,10 +44,16 @@
import org.openjdk.jmh.infra.Blackhole;
/**
- * A benchmark that measures manifest read/write performance across compression codecs.
+ * A benchmark that measures manifest read/write performance across format versions and file
+ * formats.
*
- *
Entry counts are calibrated per column count via {@link #ENTRY_BASE}. Set to 300_000 for ~8 MB
- * manifests (matching the default {@code commit.manifest.target-size-bytes}) or 15_000 for ~400 KB.
+ *
V1-V3 only support Avro manifests. V4 supports both Avro and Parquet. The {@code
+ * versionFormat} parameter encodes valid combinations as {@code "_"} (e.g. {@code
+ * "4_PARQUET"}) so that only meaningful pairings are benchmarked.
+ *
+ * Entry counts are calibrated per column count via {@link ManifestBenchmarkUtil#ENTRY_BASE}. Set
+ * to 300_000 for ~8 MB manifests (matching the default {@code commit.manifest.target-size-bytes})
+ * or 15_000 for ~400 KB.
*
*
To run this benchmark:
*
@@ -63,34 +61,33 @@
* # all combinations
* ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark
*
- * # single codec
+ * # V4-only (Avro vs Parquet)
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
+ * -PjmhParams="versionFormat=4_AVRO|4_PARQUET"
+ *
+ * # all versions, single column count
* ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
- * -PjmhParams="codec=gzip"
+ * -PjmhParams="numCols=50"
+ *
+ * # single version
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestBenchmark \
+ * -PjmhParams="versionFormat=3_AVRO"
* }
*/
@Fork(1)
@State(Scope.Benchmark)
+// Parquet's columnar write path has a deep call graph (per-column encoders, page assembly,
+// dictionary management) that requires more warmup iterations than Avro for the JIT compiler to
+// fully optimize. Profiling shows ~650ms of JIT compilation spread across the first 3-4
+// iterations, so 6 warmups ensure measurement begins after JIT has stabilized.
@Warmup(iterations = 6)
@Measurement(iterations = 10)
@BenchmarkMode(Mode.SingleShotTime)
@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
public class ManifestBenchmark {
- static final int ENTRY_BASE = 300_000;
-
- private static final int FORMAT_VERSION = 4;
-
- private static final Schema SCHEMA =
- new Schema(
- Types.NestedField.required(1, "id", Types.IntegerType.get()),
- Types.NestedField.required(2, "data", Types.StringType.get()),
- Types.NestedField.required(3, "customer", Types.StringType.get()));
-
- private static final PartitionSpec SPEC =
- PartitionSpec.builderFor(SCHEMA).identity("id").identity("data").identity("customer").build();
-
- @Param({"gzip", "snappy", "zstd", "uncompressed"})
- private String codec;
+ @Param({"1_AVRO", "2_AVRO", "3_AVRO", "4_AVRO", "4_PARQUET"})
+ private String versionFormat;
@Param({"true", "false"})
private String partitioned;
@@ -98,11 +95,11 @@ public class ManifestBenchmark {
@Param({"10", "50", "100"})
private int numCols;
+ private int formatVersion;
+ private FileFormat fileFormat;
private PartitionSpec spec;
private Map specsById;
- private Map writerProperties;
private List dataFiles;
- private int numEntries;
private String writeBaseDir;
private OutputFile writeOutputFile;
@@ -112,21 +109,26 @@ public class ManifestBenchmark {
@Setup(Level.Trial)
public void setupTrial() {
- this.spec = Boolean.parseBoolean(partitioned) ? SPEC : PartitionSpec.unpartitioned();
- this.specsById = Map.of(spec.specId(), spec);
- this.writerProperties = Map.of(TableProperties.AVRO_COMPRESSION, codec);
- // ENTRY_BASE / cols: empirically calibrated — 300_000 → ~8 MB, 15_000 → ~400 KB manifests
- this.numEntries = ENTRY_BASE / numCols;
- this.dataFiles = generateDataFiles();
+ String[] parts = versionFormat.split("_", 2);
+ this.formatVersion = Integer.parseInt(parts[0]);
+ this.fileFormat = FileFormat.fromString(parts[1]);
+ this.spec =
+ Boolean.parseBoolean(partitioned)
+ ? ManifestBenchmarkUtil.SPEC
+ : PartitionSpec.unpartitioned();
+ this.specsById = ImmutableMap.of(spec.specId(), spec);
+ int numEntries = ManifestBenchmarkUtil.entriesForColumnCount(numCols);
+ this.dataFiles = ManifestBenchmarkUtil.generateDataFiles(spec, numEntries, numCols);
setupReadManifest();
}
@Setup(Level.Invocation)
public void setupWriteInvocation() throws IOException {
- this.writeBaseDir = Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
+ this.writeBaseDir =
+ java.nio.file.Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
this.writeOutputFile =
- org.apache.iceberg.Files.localOutput(
- String.format(Locale.ROOT, "%s/manifest.avro", writeBaseDir));
+ Files.localOutput(
+ String.format(Locale.ROOT, "%s/%s", writeBaseDir, fileFormat.addExtension("manifest")));
for (DataFile file : dataFiles) {
file.path();
@@ -137,7 +139,7 @@ public void setupWriteInvocation() throws IOException {
@TearDown(Level.Trial)
public void tearDownTrial() {
- cleanDir(readBaseDir);
+ ManifestBenchmarkUtil.cleanDir(readBaseDir);
readBaseDir = null;
readManifest = null;
dataFiles = null;
@@ -145,28 +147,15 @@ public void tearDownTrial() {
@TearDown(Level.Invocation)
public void tearDownInvocation() {
- cleanDir(writeBaseDir);
+ ManifestBenchmarkUtil.cleanDir(writeBaseDir);
writeBaseDir = null;
writeOutputFile = null;
}
- @AuxCounters(AuxCounters.Type.EVENTS)
- @State(Scope.Thread)
- @SuppressWarnings("checkstyle:VisibilityModifier")
- public static class FileSizeCounters {
- public double manifestSizeMB;
-
- @Setup(Level.Invocation)
- public void reset() {
- manifestSizeMB = 0;
- }
- }
-
@Benchmark
@Threads(1)
- public ManifestFile writeManifest(FileSizeCounters counters) throws IOException {
- ManifestWriter writer =
- ManifestFiles.write(FORMAT_VERSION, spec, writeOutputFile, 1L, writerProperties);
+ public ManifestFile writeManifest() throws IOException {
+ ManifestWriter writer = ManifestFiles.write(formatVersion, spec, writeOutputFile, 1L);
try (ManifestWriter w = writer) {
for (DataFile file : dataFiles) {
@@ -174,9 +163,7 @@ public ManifestFile writeManifest(FileSizeCounters counters) throws IOException
}
}
- ManifestFile manifest = writer.toManifestFile();
- counters.manifestSizeMB = manifest.length() / (1024.0 * 1024.0);
- return manifest;
+ return writer.toManifestFile();
}
@Benchmark
@@ -193,17 +180,17 @@ public void readManifest(Blackhole blackhole) throws IOException {
private void setupReadManifest() {
try {
- this.readBaseDir = Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
+ this.readBaseDir =
+ java.nio.file.Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
OutputFile manifestFile =
- org.apache.iceberg.Files.localOutput(
- String.format(Locale.ROOT, "%s/manifest.avro", readBaseDir));
+ Files.localOutput(
+ String.format(Locale.ROOT, "%s/%s", readBaseDir, fileFormat.addExtension("manifest")));
- ManifestWriter writer =
- ManifestFiles.write(FORMAT_VERSION, spec, manifestFile, 1L, writerProperties);
+ ManifestWriter writer = ManifestFiles.write(formatVersion, spec, manifestFile, 1L);
try (ManifestWriter w = writer) {
for (DataFile file : dataFiles) {
@@ -215,65 +202,4 @@ private void setupReadManifest() {
this.readManifest = writer.toManifestFile();
}
-
- private List generateDataFiles() {
- Random random = new Random(42);
- List files = Lists.newArrayListWithCapacity(numEntries);
- for (int i = 0; i < numEntries; i++) {
- DataFiles.Builder builder =
- DataFiles.builder(spec)
- .withFormat(FileFormat.PARQUET)
- .withPath(String.format(Locale.ROOT, "/path/to/data-%d.parquet", i))
- .withFileSizeInBytes(1024 + i)
- .withRecordCount(1000 + i)
- .withMetrics(randomMetrics(random, numCols));
-
- if (!spec.isUnpartitioned()) {
- builder.withPartitionPath(
- String.format(
- Locale.ROOT, "id=%d/data=val-%d/customer=cust-%d", i % 100, i % 50, i % 200));
- }
-
- files.add(builder.build());
- }
-
- return files;
- }
-
- static Metrics randomMetrics(Random random, int cols) {
- long rowCount = 100_000L + random.nextInt(1000);
- Map columnSizes = Maps.newHashMap();
- Map valueCounts = Maps.newHashMap();
- Map nullValueCounts = Maps.newHashMap();
- Map nanValueCounts = Maps.newHashMap();
- Map lowerBounds = Maps.newHashMap();
- Map upperBounds = Maps.newHashMap();
- for (int i = 0; i < cols; i++) {
- columnSizes.put(i, 1_000_000L + random.nextInt(100_000));
- valueCounts.put(i, 100_000L + random.nextInt(100));
- nullValueCounts.put(i, (long) random.nextInt(5));
- nanValueCounts.put(i, (long) random.nextInt(5));
- byte[] lower = new byte[8];
- random.nextBytes(lower);
- lowerBounds.put(i, ByteBuffer.wrap(lower));
- byte[] upper = new byte[8];
- random.nextBytes(upper);
- upperBounds.put(i, ByteBuffer.wrap(upper));
- }
-
- return new Metrics(
- rowCount,
- columnSizes,
- valueCounts,
- nullValueCounts,
- nanValueCounts,
- lowerBounds,
- upperBounds);
- }
-
- private static void cleanDir(String dir) {
- if (dir != null) {
- FileUtils.deleteQuietly(new File(dir));
- }
- }
}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestBenchmarkUtil.java b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmarkUtil.java
new file mode 100644
index 000000000000..64602ad0a8b1
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestBenchmarkUtil.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Shared constants and stateless helpers for {@link ManifestBenchmark} and {@link
+ * ManifestCompressionBenchmark}.
+ */
+final class ManifestBenchmarkUtil {
+
+ /** Scale factor for entry counts. 300_000 yields ~8 MB manifests; 15_000 yields ~400 KB. */
+ static final int ENTRY_BASE = 300_000;
+
+ static final Schema SCHEMA =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()),
+ Types.NestedField.required(3, "customer", Types.StringType.get()));
+
+ static final PartitionSpec SPEC =
+ PartitionSpec.builderFor(SCHEMA).identity("id").identity("data").identity("customer").build();
+
+ private ManifestBenchmarkUtil() {}
+
+ /**
+ * Returns the number of manifest entries for the given column count, scaled by {@link
+ * #ENTRY_BASE}.
+ */
+ static int entriesForColumnCount(int cols) {
+ return ENTRY_BASE / cols;
+ }
+
+ static List generateDataFiles(PartitionSpec spec, int numEntries, int numCols) {
+ Random random = new Random(42);
+ List files = Lists.newArrayListWithCapacity(numEntries);
+ for (int i = 0; i < numEntries; i++) {
+ DataFiles.Builder builder =
+ DataFiles.builder(spec)
+ .withFormat(FileFormat.PARQUET)
+ .withPath(String.format(Locale.ROOT, "/path/to/data-%d.parquet", i))
+ .withFileSizeInBytes(1024 + i)
+ .withRecordCount(1000 + i)
+ .withMetrics(randomMetrics(random, numCols));
+
+ if (!spec.isUnpartitioned()) {
+ builder.withPartitionPath(
+ String.format(
+ Locale.ROOT, "id=%d/data=val-%d/customer=cust-%d", i % 100, i % 50, i % 200));
+ }
+
+ files.add(builder.build());
+ }
+ return files;
+ }
+
+ static Metrics randomMetrics(Random random, int cols) {
+ long rowCount = 100_000L + random.nextInt(1000);
+ Map columnSizes = Maps.newHashMap();
+ Map valueCounts = Maps.newHashMap();
+ Map nullValueCounts = Maps.newHashMap();
+ Map nanValueCounts = Maps.newHashMap();
+ Map lowerBounds = Maps.newHashMap();
+ Map upperBounds = Maps.newHashMap();
+ for (int i = 0; i < cols; i++) {
+ columnSizes.put(i, 1_000_000L + random.nextInt(100_000));
+ valueCounts.put(i, 100_000L + random.nextInt(100));
+ nullValueCounts.put(i, (long) random.nextInt(5));
+ nanValueCounts.put(i, (long) random.nextInt(5));
+ byte[] lower = new byte[8];
+ random.nextBytes(lower);
+ lowerBounds.put(i, ByteBuffer.wrap(lower));
+ byte[] upper = new byte[8];
+ random.nextBytes(upper);
+ upperBounds.put(i, ByteBuffer.wrap(upper));
+ }
+
+ return new Metrics(
+ rowCount,
+ columnSizes,
+ valueCounts,
+ nullValueCounts,
+ nanValueCounts,
+ lowerBounds,
+ upperBounds);
+ }
+
+ static void cleanDir(String dir) {
+ if (dir != null) {
+ FileUtils.deleteQuietly(new java.io.File(dir));
+ }
+ }
+}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestCompressionBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ManifestCompressionBenchmark.java
new file mode 100644
index 000000000000..7ba9e47c611b
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestCompressionBenchmark.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.OutputFile;
+import org.openjdk.jmh.annotations.AuxCounters;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * A benchmark that measures manifest read/write performance across compression codecs.
+ *
+ * Entry counts are calibrated per column count via {@link ManifestBenchmarkUtil#ENTRY_BASE}. Set
+ * to 300_000 for ~8 MB manifests (matching the default {@code commit.manifest.target-size-bytes})
+ * or 15_000 for ~400 KB.
+ *
+ *
To run this benchmark:
+ *
+ *
{@code
+ * # all combinations
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestCompressionBenchmark
+ *
+ * # single codec
+ * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestCompressionBenchmark \
+ * -PjmhParams="codec=gzip"
+ * }
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 6)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class ManifestCompressionBenchmark {
+
+ private static final int FORMAT_VERSION = 4;
+
+ @Param({"gzip", "snappy", "zstd", "uncompressed"})
+ private String codec;
+
+ @Param({"true", "false"})
+ private String partitioned;
+
+ @Param({"10", "50", "100"})
+ private int numCols;
+
+ private PartitionSpec spec;
+ private Map specsById;
+ private Map writerProperties;
+ private List dataFiles;
+
+ private String writeBaseDir;
+ private OutputFile writeOutputFile;
+
+ private String readBaseDir;
+ private ManifestFile readManifest;
+
+ @Setup(Level.Trial)
+ public void setupTrial() {
+ this.spec =
+ Boolean.parseBoolean(partitioned)
+ ? ManifestBenchmarkUtil.SPEC
+ : PartitionSpec.unpartitioned();
+ this.specsById = Map.of(spec.specId(), spec);
+ this.writerProperties = Map.of(TableProperties.AVRO_COMPRESSION, codec);
+ int numEntries = ManifestBenchmarkUtil.entriesForColumnCount(numCols);
+ this.dataFiles = ManifestBenchmarkUtil.generateDataFiles(spec, numEntries, numCols);
+ setupReadManifest();
+ }
+
+ @Setup(Level.Invocation)
+ public void setupWriteInvocation() throws IOException {
+ this.writeBaseDir =
+ java.nio.file.Files.createTempDirectory("bench-write-").toAbsolutePath().toString();
+ this.writeOutputFile =
+ Files.localOutput(String.format(Locale.ROOT, "%s/manifest.avro", writeBaseDir));
+
+ for (DataFile file : dataFiles) {
+ file.path();
+ file.fileSizeInBytes();
+ file.recordCount();
+ }
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDownTrial() {
+ ManifestBenchmarkUtil.cleanDir(readBaseDir);
+ readBaseDir = null;
+ readManifest = null;
+ dataFiles = null;
+ }
+
+ @TearDown(Level.Invocation)
+ public void tearDownInvocation() {
+ ManifestBenchmarkUtil.cleanDir(writeBaseDir);
+ writeBaseDir = null;
+ writeOutputFile = null;
+ }
+
+ @AuxCounters(AuxCounters.Type.EVENTS)
+ @State(Scope.Thread)
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ public static class FileSizeCounters {
+ public double manifestSizeMB;
+
+ @Setup(Level.Invocation)
+ public void reset() {
+ manifestSizeMB = 0;
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public ManifestFile writeManifest(FileSizeCounters counters) throws IOException {
+ ManifestWriter writer =
+ ManifestFiles.write(FORMAT_VERSION, spec, writeOutputFile, 1L, writerProperties);
+
+ try (ManifestWriter w = writer) {
+ for (DataFile file : dataFiles) {
+ w.add(file);
+ }
+ }
+
+ ManifestFile manifest = writer.toManifestFile();
+ counters.manifestSizeMB = manifest.length() / (1024.0 * 1024.0);
+ return manifest;
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readManifest(Blackhole blackhole) throws IOException {
+ TestTables.LocalFileIO fileIO = new TestTables.LocalFileIO();
+ try (CloseableIterator it =
+ ManifestFiles.read(readManifest, fileIO, specsById).iterator()) {
+ while (it.hasNext()) {
+ blackhole.consume(it.next());
+ }
+ }
+ }
+
+ private void setupReadManifest() {
+ try {
+ this.readBaseDir =
+ java.nio.file.Files.createTempDirectory("bench-read-").toAbsolutePath().toString();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ OutputFile manifestFile =
+ Files.localOutput(String.format(Locale.ROOT, "%s/manifest.avro", readBaseDir));
+
+ ManifestWriter writer =
+ ManifestFiles.write(FORMAT_VERSION, spec, manifestFile, 1L, writerProperties);
+
+ try (ManifestWriter w = writer) {
+ for (DataFile file : dataFiles) {
+ w.add(file);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ this.readManifest = writer.toManifestFile();
+ }
+}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
deleted file mode 100644
index 588b5df1ba97..000000000000
--- a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-import org.apache.iceberg.encryption.PlaintextEncryptionManager;
-import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Threads;
-import org.openjdk.jmh.annotations.Timeout;
-
-@Fork(1)
-@State(Scope.Benchmark)
-@Measurement(iterations = 5)
-@BenchmarkMode(Mode.SingleShotTime)
-@Timeout(time = 1000, timeUnit = TimeUnit.HOURS)
-public class ManifestReadBenchmark {
-
- private static final int NUM_FILES = 10;
- private static final int NUM_ROWS = 100000;
- private static final int NUM_COLS = 10;
-
- private String baseDir;
- private String manifestListFile;
-
- @Setup
- public void before() {
- baseDir =
- Paths.get(new File(System.getProperty("java.io.tmpdir")).getAbsolutePath()).toString();
- manifestListFile = String.format("%s/%s.avro", baseDir, UUID.randomUUID());
-
- Random random = new Random(System.currentTimeMillis());
-
- try (ManifestListWriter listWriter =
- ManifestLists.write(
- 1,
- org.apache.iceberg.Files.localOutput(manifestListFile),
- PlaintextEncryptionManager.instance(),
- 0,
- 1L,
- 0,
- 0L)) {
- for (int i = 0; i < NUM_FILES; i++) {
- OutputFile manifestFile =
- org.apache.iceberg.Files.localOutput(
- String.format("%s/%s.avro", baseDir, UUID.randomUUID()));
-
- ManifestWriter writer =
- ManifestFiles.write(1, PartitionSpec.unpartitioned(), manifestFile, 1L);
- try (ManifestWriter finalWriter = writer) {
- for (int j = 0; j < NUM_ROWS; j++) {
- DataFile dataFile =
- DataFiles.builder(PartitionSpec.unpartitioned())
- .withFormat(FileFormat.PARQUET)
- .withPath(String.format("/path/to/data-%s-%s.parquet", i, j))
- .withFileSizeInBytes(j)
- .withRecordCount(j)
- .withMetrics(randomMetrics(random))
- .build();
- finalWriter.add(dataFile);
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
-
- listWriter.add(writer.toManifestFile());
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
- @TearDown
- public void after() throws IOException {
- if (baseDir != null) {
- try (Stream walk = Files.walk(Paths.get(baseDir))) {
- walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
- }
- baseDir = null;
- }
-
- manifestListFile = null;
- }
-
- @Benchmark
- @Threads(1)
- public void readManifestFile() throws IOException {
- List manifests =
- ManifestLists.read(org.apache.iceberg.Files.localInput(manifestListFile));
- TestTables.LocalFileIO fileIO = new TestTables.LocalFileIO();
- Map specs =
- ImmutableMap.of(PartitionSpec.unpartitioned().specId(), PartitionSpec.unpartitioned());
- for (ManifestFile manifestFile : manifests) {
- ManifestReader reader = ManifestFiles.read(manifestFile, fileIO, specs);
- try (CloseableIterator it = reader.iterator()) {
- while (it.hasNext()) {
- it.next().recordCount();
- }
- }
- }
- }
-
- private Metrics randomMetrics(Random random) {
- long rowCount = 100000L + random.nextInt(1000);
- Map columnSizes = Maps.newHashMap();
- Map valueCounts = Maps.newHashMap();
- Map nullValueCounts = Maps.newHashMap();
- Map nanValueCounts = Maps.newHashMap();
- Map lowerBounds = Maps.newHashMap();
- Map upperBounds = Maps.newHashMap();
- for (int i = 0; i < NUM_COLS; i++) {
- columnSizes.put(i, 1000000L + random.nextInt(100000));
- valueCounts.put(i, 100000L + random.nextInt(100));
- nullValueCounts.put(i, (long) random.nextInt(5));
- nanValueCounts.put(i, (long) random.nextInt(5));
- byte[] lower = new byte[8];
- random.nextBytes(lower);
- lowerBounds.put(i, ByteBuffer.wrap(lower));
- byte[] upper = new byte[8];
- random.nextBytes(upper);
- upperBounds.put(i, ByteBuffer.wrap(upper));
- }
-
- return new Metrics(
- rowCount,
- columnSizes,
- valueCounts,
- nullValueCounts,
- nanValueCounts,
- lowerBounds,
- upperBounds);
- }
-}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java
deleted file mode 100644
index b0dab63dea06..000000000000
--- a/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.iceberg.encryption.PlaintextEncryptionManager;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
-import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.Param;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
-import org.openjdk.jmh.annotations.Threads;
-import org.openjdk.jmh.annotations.Timeout;
-
-/**
- * A benchmark that evaluates the performance of writing manifest files
- *
- * To run this benchmark:
- * ./gradlew :iceberg-core:jmh -PjmhIncludeRegex=ManifestWriteBenchmark
- *
- */
-@Fork(1)
-@State(Scope.Benchmark)
-@Measurement(iterations = 5)
-@BenchmarkMode(Mode.SingleShotTime)
-@Timeout(time = 5, timeUnit = TimeUnit.MINUTES)
-public class ManifestWriteBenchmark {
-
- private static final int NUM_FILES = 10;
- private static final int NUM_ROWS = 100000;
- private static final int NUM_COLS = 100;
-
- private String baseDir;
- private String manifestListFile;
-
- private Metrics metrics;
-
- @Setup
- public void before() {
- Random random = new Random(System.currentTimeMillis());
- // Pre-create the metrics to avoid doing this in the benchmark itself
- metrics = randomMetrics(random);
- }
-
- @TearDown
- public void after() {
- if (baseDir != null) {
- FileUtils.deleteQuietly(new File(baseDir));
- baseDir = null;
- }
-
- manifestListFile = null;
- }
-
- @State(Scope.Benchmark)
- public static class BenchmarkState {
- @Param({"1", "2"})
- private int formatVersion;
-
- public int getFormatVersion() {
- return formatVersion;
- }
- }
-
- @Benchmark
- @Threads(1)
- public void writeManifestFile(BenchmarkState state) throws IOException {
- this.baseDir =
- java.nio.file.Files.createTempDirectory("benchmark-").toAbsolutePath().toString();
- this.manifestListFile = String.format("%s/%s.avro", baseDir, UUID.randomUUID());
-
- try (ManifestListWriter listWriter =
- ManifestLists.write(
- state.getFormatVersion(),
- org.apache.iceberg.Files.localOutput(manifestListFile),
- PlaintextEncryptionManager.instance(),
- 0,
- 1L,
- 0,
- 0L)) {
- for (int i = 0; i < NUM_FILES; i++) {
- OutputFile manifestFile =
- org.apache.iceberg.Files.localOutput(
- String.format("%s/%s.avro", baseDir, UUID.randomUUID()));
-
- ManifestWriter writer =
- ManifestFiles.write(
- state.formatVersion, PartitionSpec.unpartitioned(), manifestFile, 1L);
- try (ManifestWriter finalWriter = writer) {
- for (int j = 0; j < NUM_ROWS; j++) {
- DataFile dataFile =
- DataFiles.builder(PartitionSpec.unpartitioned())
- .withFormat(FileFormat.PARQUET)
- .withPath(String.format("/path/to/data-%s-%s.parquet", i, j))
- .withFileSizeInBytes(j)
- .withRecordCount(j)
- .withMetrics(metrics)
- .build();
- finalWriter.add(dataFile);
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
-
- listWriter.add(writer.toManifestFile());
- }
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
- private Metrics randomMetrics(Random random) {
- long rowCount = 100000L + random.nextInt(1000);
- Map columnSizes = Maps.newHashMap();
- Map valueCounts = Maps.newHashMap();
- Map nullValueCounts = Maps.newHashMap();
- Map nanValueCounts = Maps.newHashMap();
- Map lowerBounds = Maps.newHashMap();
- Map upperBounds = Maps.newHashMap();
- for (int i = 0; i < NUM_COLS; i++) {
- columnSizes.put(i, 1000000L + random.nextInt(100000));
- valueCounts.put(i, 100000L + random.nextInt(100));
- nullValueCounts.put(i, (long) random.nextInt(5));
- nanValueCounts.put(i, (long) random.nextInt(5));
- byte[] lower = new byte[8];
- random.nextBytes(lower);
- lowerBounds.put(i, ByteBuffer.wrap(lower));
- byte[] upper = new byte[8];
- random.nextBytes(upper);
- upperBounds.put(i, ByteBuffer.wrap(upper));
- }
-
- return new Metrics(
- rowCount,
- columnSizes,
- valueCounts,
- nullValueCounts,
- nanValueCounts,
- lowerBounds,
- upperBounds);
- }
-}
diff --git a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
index e951ae830737..3af4cdd3b8a5 100644
--- a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
@@ -36,6 +36,7 @@
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.ScanMetricsUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ContentFileUtil;
@@ -144,6 +145,53 @@ protected PlanningMode deletePlanningMode() {
@Override
protected CloseableIterable doPlanFiles() {
+ if (TableUtil.formatVersion(table()) >= 4) {
+ return doPlanFilesV4();
+ }
+
+ return doPlanFilesV3();
+ }
+
+ private CloseableIterable doPlanFilesV4() {
+ Snapshot snapshot = snapshot();
+
+ // Pass the root manifest directly to ManifestExpander, not pre-filtered dataManifests.
+ // The root manifest may contain both DATA entries (inlined) and DATA_MANIFEST entries.
+ String rootManifestPath = snapshot.manifestListLocation();
+ ManifestFile rootManifest =
+ new GenericManifestFile(
+ rootManifestPath,
+ 0,
+ 0,
+ ManifestContent.DATA,
+ 0L,
+ 0L,
+ null,
+ null,
+ null,
+ 0,
+ 0L,
+ 0,
+ 0L,
+ 0,
+ 0L,
+ null);
+
+ ManifestExpander expander =
+ new ManifestExpander(table().io(), ImmutableList.of(rootManifest), specs())
+ .tableLocation(table().location())
+ .caseSensitive(isCaseSensitive())
+ .filterData(filter())
+ .scanMetrics(scanMetrics());
+
+ if (shouldIgnoreResiduals()) {
+ expander = expander.ignoreResiduals();
+ }
+
+ return CloseableIterable.transform(expander.planFiles(), task -> (ScanTask) task);
+ }
+
+ private CloseableIterable doPlanFilesV3() {
Snapshot snapshot = snapshot();
List deleteManifests = findMatchingDeleteManifests(snapshot);
diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java
index 3c31c50f099f..7f15c9188c87 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -32,6 +32,7 @@
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.SupportsIndexProjection;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
@@ -329,7 +330,9 @@ protected void internalSet(int pos, T value) {
this.partitionSpecId = (value != null) ? (Integer) value : -1;
return;
case 4:
- this.partitionData = (PartitionData) value;
+ if (value != null) {
+ this.partitionData = (PartitionData) value;
+ }
return;
case 5:
this.recordCount = (Long) value;
@@ -581,9 +584,37 @@ private static Map copyMap(Map map, Set keys) {
private static Map copyByteBufferMap(
Map map, Set keys) {
- return SerializableByteBufferMap.wrap(copyMap(map, keys));
+ if (map == null) {
+ return null;
+ }
+
+ return SerializableByteBufferMap.wrap(deepCopyByteBufferMap(map, keys));
+ }
+
+ // Required as long as we have Map in the API since Parquet reuses buffers.
+ private static Map deepCopyByteBufferMap(
+ Map map, Set keys) {
+ Map deepCopy = Maps.newHashMapWithExpectedSize(map.size());
+ for (Map.Entry entry : map.entrySet()) {
+ if (keys == null || keys.contains(entry.getKey())) {
+ ByteBuffer buf = entry.getValue();
+ if (buf != null) {
+ ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
+ copy.put(buf.duplicate());
+ copy.flip();
+ deepCopy.put(entry.getKey(), copy);
+ } else {
+ deepCopy.put(entry.getKey(), null);
+ }
+ }
+ }
+
+ return deepCopy;
}
+ // Returns an unmodifiable view of the map. The SerializableMap check is needed because
+ // internal maps may be wrapped for serialization after being populated by a format reader
+ // with container reuse enabled, and immutableMap() provides a stable snapshot.
private static Map toReadableMap(Map map) {
if (map == null) {
return null;
@@ -594,6 +625,10 @@ private static Map toReadableMap(Map map) {
}
}
+ // Separate from toReadableMap because SerializableByteBufferMap is its own wrapper type
+ // (not a SerializableMap subclass) to handle ByteBuffer-specific serialization. ByteBuffer
+ // values are mutable and can be overwritten by Parquet container reuse, so callers that
+ // retain references must use copyByteBufferMap to get independent copies.
private static Map toReadableByteBufferMap(Map map) {
if (map == null) {
return null;
diff --git a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
index 4dff19b87990..615ec6b1dc20 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java
@@ -35,6 +35,7 @@
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructProjection;
/** Base class logic for files metadata tables */
abstract class BaseFilesTable extends BaseMetadataTable {
@@ -161,6 +162,21 @@ private ManifestReadTask(
@Override
public CloseableIterable rows() {
+ if (isV4Manifest()) {
+ Schema dataFileSchema =
+ new Schema(
+ DataFile.getType(
+ specsById
+ .getOrDefault(manifest.partitionSpecId(), PartitionSpec.unpartitioned())
+ .rawPartitionType())
+ .fields());
+ return CloseableIterable.transform(
+ v4Files(),
+ file ->
+ (StructLike)
+ StructProjection.create(dataFileSchema, projection).wrap((StructLike) file));
+ }
+
Types.NestedField readableMetricsField = projection.findField(MetricsUtil.READABLE_METRICS);
if (readableMetricsField == null) {
@@ -180,6 +196,18 @@ public long estimatedRowsCount() {
+ (long) manifest.existingFilesCount();
}
+ private boolean isV4Manifest() {
+ return FileFormat.fromFileName(manifest.path()) == FileFormat.PARQUET;
+ }
+
+ private CloseableIterable extends ContentFile>> v4Files() {
+ V4ManifestReader reader = new V4ManifestReader(io.newInputFile(manifest), specsById);
+ PartitionSpec spec =
+ specsById.getOrDefault(manifest.partitionSpecId(), PartitionSpec.unpartitioned());
+ return CloseableIterable.transform(
+ reader.liveEntries(), tf -> TrackedFileAdapters.asGenericDataFile(tf.copy(), spec));
+ }
+
private CloseableIterable extends ContentFile>> files(Schema fileProjection) {
return ManifestFiles.open(manifest, io, specsById).project(fileProjection);
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index 28a45d2c7821..03d54be246ff 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -30,6 +30,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -47,6 +48,9 @@ class BaseSnapshot implements Snapshot {
private final Long addedRows;
private final String keyId;
+ // set by SnapshotProducer or SnapshotParser for resolving relative paths in v4 root manifests
+ private String tableLocation;
+
// lazily initialized
private transient List allManifests = null;
private transient List dataManifests = null;
@@ -116,6 +120,10 @@ class BaseSnapshot implements Snapshot {
this.keyId = null;
}
+ void setTableLocation(String location) {
+ this.tableLocation = location;
+ }
+
@Override
public long sequenceNumber() {
return sequenceNumber;
@@ -182,9 +190,14 @@ private void cacheManifests(FileIO fileIO) {
if (allManifests == null) {
// if manifests isn't set, then the snapshotFile is set and should be read to get the list
- this.allManifests =
- ManifestLists.read(
- fileIO.newInputFile(new BaseManifestListFile(manifestListLocation, keyId)));
+ FileFormat format = FileFormat.fromFileName(manifestListLocation);
+ if (format == FileFormat.PARQUET) {
+ this.allManifests = readRootManifest(fileIO);
+ } else {
+ this.allManifests =
+ ManifestLists.read(
+ fileIO.newInputFile(new BaseManifestListFile(manifestListLocation, keyId)));
+ }
}
if (dataManifests == null || deleteManifests == null) {
@@ -199,6 +212,24 @@ private void cacheManifests(FileIO fileIO) {
}
}
+ private List readRootManifest(FileIO fileIO) {
+ List result = Lists.newArrayList();
+ V4ManifestReader reader =
+ new V4ManifestReader(fileIO.newInputFile(manifestListLocation), ImmutableMap.of());
+ try (CloseableIterable entries = reader.liveEntries()) {
+ for (TrackedFile tf : entries) {
+ if (tf.contentType() == FileContent.DATA_MANIFEST
+ || tf.contentType() == FileContent.DELETE_MANIFEST) {
+ result.add(V4Metadata.trackedFileToManifestFile(tf.copy(), tableLocation));
+ }
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to read root manifest", e);
+ }
+
+ return result;
+ }
+
@Override
public List allManifests(FileIO fileIO) {
if (allManifests == null) {
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index dc130c8064fc..2008d9f6786a 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -19,9 +19,11 @@
package org.apache.iceberg;
import java.util.List;
+import java.util.Map;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
public class DataTableScan extends BaseTableScan {
protected DataTableScan(Table table, Schema schema, TableScanContext context) {
@@ -62,6 +64,55 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext
@Override
public CloseableIterable doPlanFiles() {
+ if (TableUtil.formatVersion(table()) >= 4) {
+ return doPlanFilesV4();
+ }
+
+ return doPlanFilesV3();
+ }
+
+ private CloseableIterable doPlanFilesV4() {
+ Snapshot snapshot = snapshot();
+ FileIO io = table().io();
+ Map specsById = specs();
+
+ // Pass the root manifest to ManifestExpander, not pre-filtered dataManifests.
+ // The root manifest may contain both inlined DATA entries and DATA_MANIFEST entries.
+ String rootManifestPath = snapshot.manifestListLocation();
+ ManifestFile rootManifest =
+ new GenericManifestFile(
+ rootManifestPath,
+ 0,
+ 0,
+ ManifestContent.DATA,
+ 0L,
+ 0L,
+ null,
+ null,
+ null,
+ 0,
+ 0L,
+ 0,
+ 0L,
+ 0,
+ 0L,
+ null);
+
+ ManifestExpander expander =
+ new ManifestExpander(io, ImmutableList.of(rootManifest), specsById)
+ .tableLocation(table().location())
+ .caseSensitive(isCaseSensitive())
+ .filterData(filter())
+ .scanMetrics(scanMetrics());
+
+ if (shouldIgnoreResiduals()) {
+ expander = expander.ignoreResiduals();
+ }
+
+ return expander.planFiles();
+ }
+
+ private CloseableIterable doPlanFilesV3() {
Snapshot snapshot = snapshot();
FileIO io = table().io();
diff --git a/core/src/main/java/org/apache/iceberg/ManifestExpander.java b/core/src/main/java/org/apache/iceberg/ManifestExpander.java
new file mode 100644
index 000000000000..827f4acfd7e6
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/ManifestExpander.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.LocationUtil;
+import org.apache.iceberg.util.ParallelIterable;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * V4 replacement for {@link ManifestGroup}.
+ *
+ * Reads v4 manifests via {@link V4ManifestReader}, separates entries by content type, expands
+ * leaf manifests (DATA_MANIFEST entries), and converts DATA entries to {@link FileScanTask}
+ * instances using {@link TrackedFileAdapters}.
+ *
+ *
Inline deletion vectors on DATA entries are attached as {@link DeleteFile} instances on the
+ * resulting {@link FileScanTask}. Equality delete matching is deferred to a future phase.
+ */
+class ManifestExpander extends CloseableGroup {
+ private final FileIO io;
+ private final Iterable manifests;
+ private final Map specsById;
+
+ private Expression dataFilter = Expressions.alwaysTrue();
+ private boolean ignoreResiduals = false;
+ private boolean caseSensitive = true;
+
+ @SuppressWarnings("UnusedVariable")
+ private ScanMetrics scanMetrics = ScanMetrics.noop();
+
+ private ExecutorService executorService = null;
+ private String tableLocation;
+
+ ManifestExpander(
+ FileIO io, Iterable manifests, Map specsById) {
+ this.io = io;
+ this.manifests = manifests;
+ this.specsById = specsById;
+ }
+
+ ManifestExpander tableLocation(String newTableLocation) {
+ this.tableLocation = newTableLocation;
+ return this;
+ }
+
+ ManifestExpander filterData(Expression newDataFilter) {
+ this.dataFilter = Expressions.and(dataFilter, newDataFilter);
+ return this;
+ }
+
+ ManifestExpander ignoreResiduals() {
+ this.ignoreResiduals = true;
+ return this;
+ }
+
+ ManifestExpander caseSensitive(boolean newCaseSensitive) {
+ this.caseSensitive = newCaseSensitive;
+ return this;
+ }
+
+ ManifestExpander scanMetrics(ScanMetrics newScanMetrics) {
+ this.scanMetrics = newScanMetrics;
+ return this;
+ }
+
+ ManifestExpander planWith(ExecutorService newExecutorService) {
+ this.executorService = newExecutorService;
+ return this;
+ }
+
+ CloseableIterable planFiles() {
+ List> taskGroups = Lists.newArrayList();
+
+ for (ManifestFile manifest : manifests) {
+ taskGroups.addAll(expandManifest(manifest));
+ }
+
+ if (executorService != null) {
+ return new ParallelIterable<>(taskGroups, executorService);
+ }
+
+ return CloseableIterable.concat(taskGroups);
+ }
+
+ private List> expandManifest(ManifestFile manifest) {
+ InputFile manifestFile = io.newInputFile(manifest);
+ V4ManifestReader reader = new V4ManifestReader(manifestFile, specsById);
+ addCloseable(reader);
+
+ // read all live entries once and partition by content type (entries are copied)
+ List dataFiles = Lists.newArrayList();
+ List leafManifests = Lists.newArrayList();
+
+ try (CloseableIterable liveEntries = reader.liveEntries()) {
+ for (TrackedFile entry : liveEntries) {
+ switch (entry.contentType()) {
+ case DATA:
+ dataFiles.add(entry.copy());
+ break;
+ case DATA_MANIFEST:
+ leafManifests.add(entry.copy());
+ break;
+ default:
+ // EQUALITY_DELETES, DELETE_MANIFEST: skip for now (future phase)
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ List> result = Lists.newArrayList();
+
+ // direct DATA entries from root
+ if (!dataFiles.isEmpty()) {
+ result.add(
+ CloseableIterable.transform(
+ CloseableIterable.withNoopClose(dataFiles), this::createTask));
+ }
+
+ // expand leaf manifests
+ for (TrackedFile leafEntry : leafManifests) {
+ result.add(expandLeafManifest(leafEntry));
+ }
+
+ return result;
+ }
+
+ private CloseableIterable expandLeafManifest(TrackedFile manifestEntry) {
+ String leafLocation = LocationUtil.resolve(manifestEntry.location(), tableLocation);
+ InputFile leafFile = io.newInputFile(leafLocation);
+ V4ManifestReader leafReader = new V4ManifestReader(leafFile, specsById);
+ addCloseable(leafReader);
+
+ RoaringBitmap deletedPositions = deletedPositions(manifestEntry);
+ CloseableIterable entries;
+ if (deletedPositions != null) {
+ // use all entries (not just live) so position counter matches manifest ordinals
+ AtomicInteger position = new AtomicInteger(0);
+ entries =
+ CloseableIterable.filter(
+ leafReader.entries(),
+ tf -> !deletedPositions.contains(position.getAndIncrement()) && isLiveData(tf));
+ } else {
+ entries =
+ CloseableIterable.filter(
+ leafReader.liveEntries(), tf -> tf.contentType() == FileContent.DATA);
+ }
+
+ return CloseableIterable.transform(entries, tf -> createTask(tf.copy()));
+ }
+
+ private static boolean isLiveData(TrackedFile tf) {
+ if (tf == null || tf.contentType() != FileContent.DATA) {
+ return false;
+ }
+
+ Tracking tracking = tf.tracking();
+ return tracking != null && tracking.isLive();
+ }
+
+ private static RoaringBitmap deletedPositions(TrackedFile manifestEntry) {
+ Tracking tracking = manifestEntry.tracking();
+ if (tracking == null) {
+ return null;
+ }
+
+ ByteBuffer deleted = tracking.deletedPositions();
+ if (deleted == null) {
+ return null;
+ }
+
+ return deserializeBitmap(deleted);
+ }
+
+ private static RoaringBitmap deserializeBitmap(ByteBuffer buffer) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.asReadOnlyBuffer().get(bytes);
+
+ RoaringBitmap bitmap = new RoaringBitmap();
+ try {
+ bitmap.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to deserialize metadata deletion vector");
+ }
+
+ return bitmap;
+ }
+
+ private FileScanTask createTask(TrackedFile trackedFile) {
+ int specId = trackedFile.specId() != null ? trackedFile.specId() : 0;
+ PartitionSpec spec = specsById.get(specId);
+ DataFile dataFile = TrackedFileAdapters.asDataFile(trackedFile, spec, tableLocation);
+
+ DeleteFile[] deletes;
+ if (trackedFile.deletionVector() != null) {
+ deletes =
+ new DeleteFile[] {TrackedFileAdapters.asDVDeleteFile(trackedFile, spec, tableLocation)};
+ } else {
+ deletes = new DeleteFile[0];
+ }
+
+ Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : dataFilter;
+ ResidualEvaluator residuals = ResidualEvaluator.of(spec, filter, caseSensitive);
+
+ return new BaseFileScanTask(
+ dataFile,
+ deletes,
+ SchemaParser.toJson(spec.schema()),
+ PartitionSpecParser.toJson(spec),
+ residuals);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index ffeff9c99145..511b152bee58 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -231,6 +231,23 @@ public static ManifestWriter write(
return newWriter(formatVersion, spec, encryptedOutputFile, snapshotId, null, writerProperties);
}
+ public static ManifestWriter write(
+ int formatVersion,
+ PartitionSpec spec,
+ EncryptedOutputFile encryptedOutputFile,
+ Long snapshotId,
+ Map writerProperties,
+ String tableLocation) {
+ return newWriter(
+ formatVersion,
+ spec,
+ encryptedOutputFile,
+ snapshotId,
+ null,
+ writerProperties,
+ tableLocation);
+ }
+
/**
* Create a new {@link ManifestWriter} for the given format version.
*
@@ -267,13 +284,24 @@ public static ManifestWriter write(
OutputFile outputFile,
Long snapshotId,
Map writerProperties) {
+ return write(formatVersion, spec, outputFile, snapshotId, writerProperties, null);
+ }
+
+ public static ManifestWriter write(
+ int formatVersion,
+ PartitionSpec spec,
+ OutputFile outputFile,
+ Long snapshotId,
+ Map writerProperties,
+ String tableLocation) {
return newWriter(
formatVersion,
spec,
EncryptedFiles.plainAsEncryptedOutput(outputFile),
snapshotId,
null,
- writerProperties);
+ writerProperties,
+ tableLocation);
}
@VisibleForTesting
@@ -284,6 +312,18 @@ static ManifestWriter newWriter(
Long snapshotId,
Long firstRowId,
Map writerProperties) {
+ return newWriter(
+ formatVersion, spec, encryptedOutputFile, snapshotId, firstRowId, writerProperties, null);
+ }
+
+ static ManifestWriter newWriter(
+ int formatVersion,
+ PartitionSpec spec,
+ EncryptedOutputFile encryptedOutputFile,
+ Long snapshotId,
+ Long firstRowId,
+ Map writerProperties,
+ String tableLocation) {
switch (formatVersion) {
case 1:
return new ManifestWriter.V1Writer(spec, encryptedOutputFile, snapshotId, writerProperties);
@@ -294,8 +334,9 @@ static ManifestWriter newWriter(
spec, encryptedOutputFile, snapshotId, firstRowId, writerProperties);
case 4:
return new ManifestWriter.V4Writer(
- spec, encryptedOutputFile, snapshotId, firstRowId, writerProperties);
+ spec, encryptedOutputFile, snapshotId, firstRowId, writerProperties, tableLocation);
}
+
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
}
@@ -359,6 +400,22 @@ public static ManifestWriter writeDeleteManifest(
writerProperties);
}
+ public static ManifestWriter writeDeleteManifest(
+ int formatVersion,
+ PartitionSpec spec,
+ OutputFile outputFile,
+ Long snapshotId,
+ Map writerProperties,
+ String tableLocation) {
+ return writeDeleteManifest(
+ formatVersion,
+ spec,
+ EncryptedFiles.plainAsEncryptedOutput(outputFile),
+ snapshotId,
+ writerProperties,
+ tableLocation);
+ }
+
/**
* Create a new {@link ManifestWriter} for the given format version.
*
@@ -389,6 +446,16 @@ public static ManifestWriter writeDeleteManifest(
EncryptedOutputFile outputFile,
Long snapshotId,
Map writerProperties) {
+ return writeDeleteManifest(formatVersion, spec, outputFile, snapshotId, writerProperties, null);
+ }
+
+ public static ManifestWriter writeDeleteManifest(
+ int formatVersion,
+ PartitionSpec spec,
+ EncryptedOutputFile outputFile,
+ Long snapshotId,
+ Map writerProperties,
+ String tableLocation) {
switch (formatVersion) {
case 1:
throw new IllegalArgumentException("Cannot write delete files in a v1 table");
@@ -397,8 +464,10 @@ public static ManifestWriter writeDeleteManifest(
case 3:
return new ManifestWriter.V3DeleteWriter(spec, outputFile, snapshotId, writerProperties);
case 4:
- return new ManifestWriter.V4DeleteWriter(spec, outputFile, snapshotId, writerProperties);
+ return new ManifestWriter.V4DeleteWriter(
+ spec, outputFile, snapshotId, writerProperties, tableLocation);
}
+
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestInfoStruct.java b/core/src/main/java/org/apache/iceberg/ManifestInfoStruct.java
index 8f51df749e33..a936cc4fb654 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestInfoStruct.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestInfoStruct.java
@@ -58,6 +58,10 @@ class ManifestInfoStruct extends SupportsIndexProjection implements ManifestInfo
super(BASE_TYPE, type);
}
+ ManifestInfoStruct() {
+ super(BASE_TYPE.fields().size());
+ }
+
private ManifestInfoStruct(ManifestInfoStruct toCopy) {
super(toCopy);
this.addedFilesCount = toCopy.addedFilesCount;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 668a3764de1d..096b003cbee4 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -40,6 +40,7 @@
import org.apache.iceberg.metrics.ScanMetrics;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -59,6 +60,13 @@ public class ManifestReader> extends CloseableGroup
static final ImmutableList ALL_COLUMNS = ImmutableList.of("*");
+ private static final Types.NestedField UNPARTITIONED_PARTITION_FIELD =
+ Types.NestedField.optional(
+ DataFile.PARTITION_ID,
+ DataFile.PARTITION_NAME,
+ Types.StructType.of(),
+ DataFile.PARTITION_DOC);
+
private static final Set STATS_COLUMNS =
ImmutableSet.of(
"value_counts",
@@ -84,6 +92,7 @@ private Class extends StructLike> fileClass() {
}
private final InputFile file;
+ private final Map specsById;
private final InheritableMetadata inheritableMetadata;
private final Long firstRowId;
private final FileType content;
@@ -123,6 +132,7 @@ protected ManifestReader(
firstRowId == null || content == FileType.DATA_FILES,
"First row ID is not valid for delete manifests");
this.file = file;
+ this.specsById = specsById;
this.inheritableMetadata = inheritableMetadata;
this.firstRowId = firstRowId;
this.content = content;
@@ -154,6 +164,12 @@ private > PartitionSpec readPartitionSpec(InputFile inp
}
private static > Map readMetadata(InputFile inputFile) {
+ FileFormat manifestFormat = FileFormat.fromFileName(inputFile.location());
+ Preconditions.checkArgument(
+ manifestFormat == FileFormat.AVRO,
+ "Reading manifest metadata is only supported for Avro manifests: %s",
+ inputFile.location());
+
Map metadata;
try {
try (CloseableIterable> headerReader =
@@ -281,14 +297,34 @@ private CloseableIterable> open(Schema projection) {
Preconditions.checkArgument(
format != null, "Unable to determine format of manifest: %s", file.location());
+ if (isV4Manifest(format)) {
+ return openV4();
+ }
+
+ boolean unpartitioned = spec.rawPartitionType().fields().isEmpty();
+
+ // V4+ manifests omit the partition field when unpartitioned (Parquet cannot represent
+ // empty structs, and the field is meaningless regardless of format). Mark it optional so
+ // the reader returns null for the missing field instead of throwing. The field must stay
+ // in the projection to preserve positional access for callers like StructProjection.
+ // For older versions where the empty struct is present, making it optional is harmless.
List fields = Lists.newArrayList();
- fields.addAll(projection.asStruct().fields());
+ for (Types.NestedField field : projection.asStruct().fields()) {
+ if (unpartitioned && field.fieldId() == DataFile.PARTITION_ID) {
+ fields.add(UNPARTITIONED_PARTITION_FIELD);
+ } else {
+ fields.add(field);
+ }
+ }
+
if (projection.findField(DataFile.RECORD_COUNT.fieldId()) == null) {
fields.add(DataFile.RECORD_COUNT);
}
+
if (projection.findField(DataFile.FIRST_ROW_ID.fieldId()) == null) {
fields.add(DataFile.FIRST_ROW_ID);
}
+
fields.add(MetadataColumns.ROW_POSITION);
CloseableIterable> reader =
@@ -307,6 +343,27 @@ private CloseableIterable> open(Schema projection) {
return CloseableIterable.transform(withMetadata, idAssigner(firstRowId));
}
+ private boolean isV4Manifest(FileFormat format) {
+ return format == FileFormat.PARQUET;
+ }
+
+ @SuppressWarnings("unchecked")
+ private CloseableIterable> openV4() {
+ V4ManifestReader v4Reader =
+ new V4ManifestReader(file, specsById != null ? specsById : ImmutableMap.of());
+ addCloseable(v4Reader);
+
+ // adapt TrackedFile entries to ManifestEntry via TrackedFileEntryAdapter
+ CloseableIterable> adapted =
+ CloseableIterable.transform(
+ v4Reader.entries(),
+ tf -> (ManifestEntry) new TrackedFileEntryAdapter(tf.copy(), spec));
+
+ CloseableIterable> withMetadata =
+ CloseableIterable.transform(adapted, inheritableMetadata::apply);
+ return CloseableIterable.transform(withMetadata, idAssigner(firstRowId));
+ }
+
CloseableIterable> liveEntries() {
return entries(true /* only live entries */);
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 7d85f991b080..650e0334c794 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -24,6 +24,7 @@
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata;
+import org.apache.iceberg.encryption.NativeEncryptionOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
@@ -40,6 +41,7 @@ public abstract class ManifestWriter> implements FileAp
// this is replaced when writing a manifest list by the ManifestFile wrapper
static final long UNASSIGNED_SEQ = -1L;
+ private final FileFormat format;
private final OutputFile file;
private final EncryptionKeyMetadata keyMetadata;
private final int specId;
@@ -65,7 +67,8 @@ private ManifestWriter(
Long snapshotId,
Long firstRowId,
Map writerProperties) {
- this.file = file.encryptingOutputFile();
+ this.format = FileFormat.fromFileName(file.encryptingOutputFile().location());
+ this.file = outputFile(file);
this.specId = spec.specId();
this.writerProperties = writerProperties;
this.writer = newAppender(spec, this.file);
@@ -82,6 +85,20 @@ private ManifestWriter(
protected abstract FileAppender> newAppender(
PartitionSpec spec, OutputFile outputFile);
+ private OutputFile outputFile(EncryptedOutputFile encryptedFile) {
+ // Casting to NativeEncryptionOutputFile actually makes the file rely on native encryption
+ // rather than whole-file encryption.
+ if (format == FileFormat.PARQUET
+ && encryptedFile instanceof NativeEncryptionOutputFile nativeFile) {
+ return nativeFile;
+ }
+ return encryptedFile.encryptingOutputFile();
+ }
+
+ protected FileFormat format() {
+ return format;
+ }
+
protected Map writerProperties() {
return writerProperties;
}
@@ -206,16 +223,7 @@ public long length() {
public ManifestFile toManifestFile() {
Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed");
- ByteBuffer keyMetadataBuffer;
- if (keyMetadata instanceof NativeEncryptionKeyMetadata) {
- // File length is required by AES GCM Stream encryption, to prevent file truncation attacks
- keyMetadataBuffer =
- ((NativeEncryptionKeyMetadata) keyMetadata).copyWithLength(length()).buffer();
- } else if (keyMetadata != null) {
- keyMetadataBuffer = keyMetadata.buffer();
- } else {
- keyMetadataBuffer = null;
- }
+ ByteBuffer keyMetadataBuffer = keyMetadataBuffer();
// if the minSequenceNumber is null, then no manifests with a sequence number have been written,
// so the min data sequence number is the one that will be assigned when this is committed.
@@ -240,6 +248,18 @@ public ManifestFile toManifestFile() {
firstRowId);
}
+ private ByteBuffer keyMetadataBuffer() {
+ if (keyMetadata instanceof NativeEncryptionKeyMetadata nativeKeyMetadata
+ && format == FileFormat.AVRO) {
+ // Whole-file encryption needs the file length embedded for GCM truncation protection.
+ // Formats with native encryption (like Parquet) handle this directly and don't need it.
+ return nativeKeyMetadata.copyWithLength(length()).buffer();
+ } else if (keyMetadata != null) {
+ return keyMetadata.buffer();
+ }
+ return null;
+ }
+
@Override
public void close() throws IOException {
this.closed = true;
@@ -254,9 +274,11 @@ static class V4Writer extends ManifestWriter {
EncryptedOutputFile file,
Long snapshotId,
Long firstRowId,
- Map writerProperties) {
+ Map writerProperties,
+ String tableLocation) {
super(spec, file, snapshotId, firstRowId, writerProperties);
- this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
+ this.entryWrapper =
+ new V4Metadata.ManifestEntryWrapper<>(snapshotId, spec.partitionType(), tableLocation);
}
@Override
@@ -269,9 +291,9 @@ protected FileAppender> newAppender(
PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
try {
- return InternalData.write(FileFormat.AVRO, file)
+ return InternalData.write(format(), file)
.schema(manifestSchema)
- .named("manifest_entry")
+ .named("tracked_file")
.meta("schema", SchemaParser.toJson(spec.schema()))
.meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
.meta("partition-spec-id", String.valueOf(spec.specId()))
@@ -294,9 +316,11 @@ static class V4DeleteWriter extends ManifestWriter {
PartitionSpec spec,
EncryptedOutputFile file,
Long snapshotId,
- Map writerProperties) {
+ Map writerProperties,
+ String tableLocation) {
super(spec, file, snapshotId, null, writerProperties);
- this.entryWrapper = new V4Metadata.ManifestEntryWrapper<>(snapshotId);
+ this.entryWrapper =
+ new V4Metadata.ManifestEntryWrapper<>(snapshotId, spec.partitionType(), tableLocation);
}
@Override
@@ -309,9 +333,9 @@ protected FileAppender> newAppender(
PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V4Metadata.entrySchema(spec.partitionType());
try {
- return InternalData.write(FileFormat.AVRO, file)
+ return InternalData.write(format(), file)
.schema(manifestSchema)
- .named("manifest_entry")
+ .named("tracked_file")
.meta("schema", SchemaParser.toJson(spec.schema()))
.meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
.meta("partition-spec-id", String.valueOf(spec.specId()))
@@ -342,6 +366,8 @@ static class V3Writer extends ManifestWriter {
Long firstRowId,
Map writerProperties) {
super(spec, file, snapshotId, firstRowId, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V3 manifests must use Avro, but got: %s", format());
this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -382,6 +408,8 @@ static class V3DeleteWriter extends ManifestWriter {
Long snapshotId,
Map writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V3 manifests must use Avro, but got: %s", format());
this.entryWrapper = new V3Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -427,6 +455,8 @@ static class V2Writer extends ManifestWriter {
Long snapshotId,
Map writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V2 manifests must use Avro, but got: %s", format());
this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -467,6 +497,8 @@ static class V2DeleteWriter extends ManifestWriter {
Long snapshotId,
Map writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V2 manifests must use Avro, but got: %s", format());
this.entryWrapper = new V2Metadata.ManifestEntryWrapper<>(snapshotId);
}
@@ -512,6 +544,8 @@ static class V1Writer extends ManifestWriter {
Long snapshotId,
Map writerProperties) {
super(spec, file, snapshotId, null, writerProperties);
+ Preconditions.checkArgument(
+ format() == FileFormat.AVRO, "V1 manifests must use Avro, but got: %s", format());
this.entryWrapper = new V1Metadata.ManifestEntryWrapper();
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java
index 53cec16dcd87..a40785ba5d5a 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java
@@ -31,6 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.JsonUtil;
+import org.apache.iceberg.util.LocationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,14 +57,21 @@ private SnapshotParser() {}
private static final String KEY_ID = "key-id";
static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException {
+ toJson(snapshot, generator, null);
+ }
+
+ static void toJson(Snapshot snapshot, JsonGenerator generator, String tableLocation)
+ throws IOException {
generator.writeStartObject();
if (snapshot.sequenceNumber() > TableMetadata.INITIAL_SEQUENCE_NUMBER) {
generator.writeNumberField(SEQUENCE_NUMBER, snapshot.sequenceNumber());
}
+
generator.writeNumberField(SNAPSHOT_ID, snapshot.snapshotId());
if (snapshot.parentId() != null) {
generator.writeNumberField(PARENT_SNAPSHOT_ID, snapshot.parentId());
}
+
generator.writeNumberField(TIMESTAMP_MS, snapshot.timestampMillis());
// if there is an operation, write the summary map
@@ -76,16 +84,19 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio
if (OPERATION.equals(entry.getKey())) {
continue;
}
+
generator.writeStringField(entry.getKey(), entry.getValue());
}
}
+
generator.writeEndObject();
}
String manifestList = snapshot.manifestListLocation();
if (manifestList != null) {
// write just the location. manifests should not be embedded in JSON along with a list
- generator.writeStringField(MANIFEST_LIST, manifestList);
+ generator.writeStringField(
+ MANIFEST_LIST, LocationUtil.relativize(manifestList, tableLocation));
} else {
// embed the manifest list in the JSON, v1 only
JsonUtil.writeStringArray(
@@ -122,6 +133,10 @@ public static String toJson(Snapshot snapshot, boolean pretty) {
}
static Snapshot fromJson(JsonNode node) {
+ return fromJson(node, null);
+ }
+
+ static Snapshot fromJson(JsonNode node, String tableLocation) {
Preconditions.checkArgument(
node.isObject(), "Cannot parse table version from a non-object: %s", node);
@@ -129,11 +144,13 @@ static Snapshot fromJson(JsonNode node) {
if (node.has(SEQUENCE_NUMBER)) {
sequenceNumber = JsonUtil.getLong(SEQUENCE_NUMBER, node);
}
+
long snapshotId = JsonUtil.getLong(SNAPSHOT_ID, node);
Long parentId = null;
if (node.has(PARENT_SNAPSHOT_ID)) {
parentId = JsonUtil.getLong(PARENT_SNAPSHOT_ID, node);
}
+
long timestamp = JsonUtil.getLong(TIMESTAMP_MS, node);
Map summary = null;
@@ -156,6 +173,7 @@ static Snapshot fromJson(JsonNode node) {
builder.put(field, JsonUtil.getString(field, sNode));
}
}
+
summary = builder.build();
// When the operation is not found, default to overwrite
@@ -179,18 +197,22 @@ static Snapshot fromJson(JsonNode node) {
if (node.has(MANIFEST_LIST)) {
// the manifest list is stored in a manifest list file
String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
- return new BaseSnapshot(
- sequenceNumber,
- snapshotId,
- parentId,
- timestamp,
- operation,
- summary,
- schemaId,
- manifestList,
- firstRowId,
- addedRows,
- keyId);
+ manifestList = LocationUtil.resolve(manifestList, tableLocation);
+ BaseSnapshot snapshot =
+ new BaseSnapshot(
+ sequenceNumber,
+ snapshotId,
+ parentId,
+ timestamp,
+ operation,
+ summary,
+ schemaId,
+ manifestList,
+ firstRowId,
+ addedRows,
+ keyId);
+ snapshot.setTableLocation(tableLocation);
+ return snapshot;
} else {
// fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 6ba10e8049f6..7b9420cc0e7e 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -50,6 +50,7 @@
import java.util.function.Function;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptingFileIO;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.exceptions.CleanableFailure;
@@ -57,6 +58,7 @@
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.metrics.CommitMetrics;
import org.apache.iceberg.metrics.CommitMetricsResult;
@@ -72,6 +74,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.IntMath;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Exceptions;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
@@ -113,6 +116,7 @@ public void accept(String file) {
private final AtomicInteger attempt = new AtomicInteger(0);
private final List manifestLists = Lists.newArrayList();
private final long targetManifestSizeBytes;
+ private final FileFormat manifestFormat;
private final Map manifestWriterProps;
private MetricsReporter reporter = LoggingMetricsReporter.instance();
private volatile Long snapshotId = null;
@@ -142,6 +146,10 @@ protected SnapshotProducer(TableOperations ops) {
this.targetManifestSizeBytes =
ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
+ this.manifestFormat =
+ ops.current().formatVersion() >= TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS
+ ? FileFormat.PARQUET
+ : FileFormat.AVRO;
this.manifestWriterProps = manifestWriterProperties(ops.current());
boolean snapshotIdInheritanceEnabled =
ops.current()
@@ -280,6 +288,23 @@ public Snapshot apply() {
List manifests = apply(base, parentSnapshot);
+ ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
+
+ Tasks.range(manifestFiles.length)
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .executeWith(workerPool())
+ .run(index -> manifestFiles[index] = manifestsWithMetadata.get(manifests.get(index)));
+
+ if (base.formatVersion() >= TableMetadata.MIN_FORMAT_VERSION_PARQUET_MANIFESTS) {
+ return applyV4(manifestFiles, sequenceNumber, parentSnapshotId, parentSnapshot);
+ } else {
+ return applyV3(manifestFiles, sequenceNumber, parentSnapshotId);
+ }
+ }
+
+ private Snapshot applyV3(
+ ManifestFile[] manifestFiles, long sequenceNumber, Long parentSnapshotId) {
OutputFile manifestList = manifestListPath();
ManifestListWriter writer =
@@ -293,17 +318,7 @@ public Snapshot apply() {
base.nextRowId());
try (writer) {
- // keep track of the manifest lists created
manifestLists.add(manifestList.location());
-
- ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
-
- Tasks.range(manifestFiles.length)
- .stopOnFailure()
- .throwFailureWhenFinished()
- .executeWith(workerPool())
- .run(index -> manifestFiles[index] = manifestsWithMetadata.get(manifests.get(index)));
-
writer.addAll(Arrays.asList(manifestFiles));
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write manifest list file");
@@ -316,6 +331,75 @@ public Snapshot apply() {
assignedRows = writer.nextRowId() - base.nextRowId();
}
+ validateReplace();
+
+ return new BaseSnapshot(
+ sequenceNumber,
+ snapshotId(),
+ parentSnapshotId,
+ System.currentTimeMillis(),
+ operation(),
+ summary(base),
+ base.currentSchemaId(),
+ manifestList.location(),
+ nextRowId,
+ assignedRows,
+ writer.toManifestListFile().encryptionKeyID());
+ }
+
+ private Snapshot applyV4(
+ ManifestFile[] manifestFiles,
+ long sequenceNumber,
+ Long parentSnapshotId,
+ Snapshot parentSnapshot) {
+ // Read data/delete file entries from the parent snapshot's root manifest (flat root).
+ // These entries must be carried forward into the new root manifest.
+ List parentDataEntries = readParentDataEntries(parentSnapshot);
+
+ OutputFile rootManifest = rootManifestPath();
+ writeRootManifest(
+ rootManifest,
+ manifestFiles,
+ parentDataEntries,
+ snapshotId(),
+ sequenceNumber,
+ base.location());
+ manifestLists.add(rootManifest.location());
+
+ // compute nextRowId by summing added rows across all data manifests
+ long addedDataRows = 0L;
+ for (ManifestFile mf : manifestFiles) {
+ if (mf.content() == ManifestContent.DATA
+ && mf.snapshotId() != null
+ && mf.snapshotId() == snapshotId()
+ && mf.addedRowsCount() != null) {
+ addedDataRows += mf.addedRowsCount();
+ }
+ }
+
+ Long nextRowId = base.nextRowId();
+ Long assignedRows = addedDataRows;
+
+ validateReplace();
+
+ BaseSnapshot snapshot =
+ new BaseSnapshot(
+ sequenceNumber,
+ snapshotId(),
+ parentSnapshotId,
+ System.currentTimeMillis(),
+ operation(),
+ summary(base),
+ base.currentSchemaId(),
+ rootManifest.location(),
+ nextRowId,
+ assignedRows,
+ null);
+ snapshot.setTableLocation(base.location());
+ return snapshot;
+ }
+
+ private void validateReplace() {
Map summary = summary();
String operation = operation();
@@ -332,19 +416,72 @@ public Snapshot apply() {
addedRecords,
replacedRecords);
}
+ }
- return new BaseSnapshot(
- sequenceNumber,
- snapshotId(),
- parentSnapshotId,
- System.currentTimeMillis(),
- operation(),
- summary(base),
- base.currentSchemaId(),
- manifestList.location(),
- nextRowId,
- assignedRows,
- writer.toManifestListFile().encryptionKeyID());
+ private void writeRootManifest(
+ OutputFile output,
+ ManifestFile[] manifests,
+ List dataEntries,
+ long commitSnapshotId,
+ long commitSequenceNumber,
+ String tableLocation) {
+ Schema schema = V4Metadata.entrySchema(Types.StructType.of());
+ try (FileAppender writer =
+ InternalData.write(FileFormat.PARQUET, output)
+ .schema(schema)
+ .named("tracked_file")
+ .meta("format-version", "4")
+ .meta("content", "root")
+ .overwrite()
+ .build()) {
+ for (ManifestFile manifest : manifests) {
+ writer.add(
+ V4Metadata.manifestFileToTrackedFile(
+ manifest, commitSnapshotId, commitSequenceNumber, tableLocation));
+ }
+
+ // Carry forward data/delete file entries from the parent's flat root manifest,
+ // re-projecting to the root manifest write schema.
+ for (TrackedFile entry : dataEntries) {
+ writer.add(V4Metadata.dataEntryForRootManifest(entry));
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to write root manifest file");
+ }
+ }
+
+ /**
+ * Reads data and delete file entries (not manifest references) from the parent snapshot's root
+ * manifest. Returns an empty list if the parent has no root manifest or contains only manifest
+ * references.
+ */
+ private List readParentDataEntries(Snapshot parentSnapshot) {
+ List dataEntries = Lists.newArrayList();
+ if (parentSnapshot == null || parentSnapshot.manifestListLocation() == null) {
+ return dataEntries;
+ }
+
+ FileFormat format = FileFormat.fromFileName(parentSnapshot.manifestListLocation());
+ if (format != FileFormat.PARQUET) {
+ return dataEntries;
+ }
+
+ V4ManifestReader reader =
+ new V4ManifestReader(
+ ops().io().newInputFile(parentSnapshot.manifestListLocation()),
+ ImmutableMap.of());
+ try (CloseableIterable entries = reader.liveEntries()) {
+ for (TrackedFile tf : entries) {
+ if (tf.contentType() != FileContent.DATA_MANIFEST
+ && tf.contentType() != FileContent.DELETE_MANIFEST) {
+ dataEntries.add(tf.copy());
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to read parent root manifest");
+ }
+
+ return dataEntries;
}
private void runValidations(Snapshot parentSnapshot) {
@@ -600,10 +737,18 @@ protected OutputFile manifestListPath() {
commitUUID))));
}
+ protected OutputFile rootManifestPath() {
+ return ops.io()
+ .newOutputFile(
+ ops.metadataFileLocation(
+ FileFormat.PARQUET.addExtension(
+ commitUUID + "-root-" + attempt.incrementAndGet())));
+ }
+
protected EncryptedOutputFile newManifestOutputFile() {
String manifestFileLocation =
ops.metadataFileLocation(
- FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement()));
+ manifestFormat.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement()));
return EncryptingFileIO.combine(ops.io(), ops.encryption())
.newEncryptingOutputFile(manifestFileLocation);
}
@@ -614,7 +759,8 @@ protected ManifestWriter newManifestWriter(PartitionSpec spec) {
spec,
newManifestOutputFile(),
snapshotId(),
- manifestWriterProps);
+ manifestWriterProps,
+ base.location());
}
protected ManifestWriter newDeleteManifestWriter(PartitionSpec spec) {
@@ -623,7 +769,8 @@ protected ManifestWriter newDeleteManifestWriter(PartitionSpec spec)
spec,
newManifestOutputFile(),
snapshotId(),
- manifestWriterProps);
+ manifestWriterProps,
+ base.location());
}
protected RollingManifestWriter newRollingManifestWriter(PartitionSpec spec) {
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 43a67dd2bef2..c4a7bfc5c83c 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -57,6 +57,7 @@ public class TableMetadata implements Serializable {
static final int DEFAULT_TABLE_FORMAT_VERSION = 2;
static final int SUPPORTED_TABLE_FORMAT_VERSION = 4;
static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3;
+ static final int MIN_FORMAT_VERSION_PARQUET_MANIFESTS = 4;
static final int INITIAL_SPEC_ID = 0;
static final int INITIAL_SORT_ORDER_ID = 1;
static final int INITIAL_SCHEMA_ID = 0;
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index eeeeeab8a699..c6f3209e8dc5 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -44,6 +44,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.JsonUtil;
+import org.apache.iceberg.util.LocationUtil;
public class TableMetadataParser {
@@ -241,10 +242,12 @@ public static void toJson(TableMetadata metadata, JsonGenerator generator) throw
toJson(metadata.refs(), generator);
+ String snapshotTableLocation = metadata.formatVersion() >= 4 ? metadata.location() : null;
generator.writeArrayFieldStart(SNAPSHOTS);
for (Snapshot snapshot : metadata.snapshots()) {
- SnapshotParser.toJson(snapshot, generator);
+ SnapshotParser.toJson(snapshot, generator, snapshotTableLocation);
}
+
generator.writeEndArray();
generator.writeArrayFieldStart(STATISTICS);
@@ -272,9 +275,11 @@ public static void toJson(TableMetadata metadata, JsonGenerator generator) throw
for (MetadataLogEntry logEntry : metadata.previousFiles()) {
generator.writeStartObject();
generator.writeNumberField(TIMESTAMP_MS, logEntry.timestampMillis());
- generator.writeStringField(METADATA_FILE, logEntry.file());
+ generator.writeStringField(
+ METADATA_FILE, LocationUtil.relativize(logEntry.file(), snapshotTableLocation));
generator.writeEndObject();
}
+
generator.writeEndArray();
generator.writeEndObject();
@@ -510,7 +515,8 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) {
snapshots = Lists.newArrayListWithExpectedSize(snapshotArray.size());
Iterator iterator = snapshotArray.elements();
while (iterator.hasNext()) {
- snapshots.add(SnapshotParser.fromJson(iterator.next()));
+ snapshots.add(
+ SnapshotParser.fromJson(iterator.next(), formatVersion >= 4 ? location : null));
}
} else {
snapshots = ImmutableList.of();
@@ -547,10 +553,11 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) {
Iterator logIterator = node.get(METADATA_LOG).elements();
while (logIterator.hasNext()) {
JsonNode entryNode = logIterator.next();
+ String metadataFile = JsonUtil.getString(METADATA_FILE, entryNode);
metadataEntries.add(
new MetadataLogEntry(
JsonUtil.getLong(TIMESTAMP_MS, entryNode),
- JsonUtil.getString(METADATA_FILE, entryNode)));
+ LocationUtil.resolve(metadataFile, formatVersion >= 4 ? location : null)));
}
}
diff --git a/core/src/main/java/org/apache/iceberg/TrackedFileAdapters.java b/core/src/main/java/org/apache/iceberg/TrackedFileAdapters.java
new file mode 100644
index 000000000000..9f1bf3d5439f
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/TrackedFileAdapters.java
@@ -0,0 +1,787 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.LocationUtil;
+
+/**
+ * Adapts {@link TrackedFile} entries to the {@link DataFile} and {@link DeleteFile} APIs.
+ *
+ * V4 colocates deletion vectors with data file entries in {@link TrackedFile}. Rather than
+ * extending {@link DataFile} with deletion vector fields, DVs are extracted as separate {@link
+ * DeleteFile} objects via {@link #asDVDeleteFile(TrackedFile, PartitionSpec)}. This matches the v3
+ * convention where DVs are tracked as {@link DeleteFile} entries in delete manifests and keeps the
+ * existing {@link FileScanTask} contract ({@code file()} + {@code deletes()}) unchanged.
+ */
+class TrackedFileAdapters {
+
+ private TrackedFileAdapters() {}
+
+ /**
+ * Creates a {@link GenericDataFile} from a TrackedFile using the reader constructor so that
+ * SupportsIndexProjection is correctly initialized for metadata table reads.
+ */
+ static GenericDataFile asGenericDataFile(TrackedFile file, PartitionSpec spec) {
+ Preconditions.checkState(
+ file.contentType() == FileContent.DATA,
+ "Cannot convert tracked file to DataFile: content type is %s, not DATA",
+ file.contentType());
+
+ Types.StructType partitionType = spec != null ? spec.rawPartitionType() : Types.StructType.of();
+ Types.StructType projection = DataFile.getType(partitionType);
+
+ // use the reader constructor for correct SupportsIndexProjection mapping
+ GenericDataFile dataFile = new GenericDataFile(projection);
+
+ // populate using DataFile.getType() positions (same as BaseFile internal positions)
+ // 0=content, 1=file_path, 2=file_format, 3=spec_id, 4=partition, 5=record_count,
+ // 6=file_size, 7=column_sizes, 8=value_counts, 9=null_value_counts, 10=nan_value_counts,
+ // 11=lower_bounds, 12=upper_bounds, 13=key_metadata, 14=split_offsets, 15=equality_ids,
+ // 16=sort_order_id, 17=first_row_id
+ Tracking tracking = file.tracking();
+ dataFile.set(0, file.contentType().id());
+ dataFile.set(1, file.location());
+ dataFile.set(2, file.fileFormat() != null ? file.fileFormat().toString() : null);
+ dataFile.set(3, file.specId() != null ? file.specId() : 0);
+ if (!partitionType.fields().isEmpty()) {
+ dataFile.set(4, extractPartition(file, spec));
+ }
+
+ dataFile.set(5, file.recordCount());
+ dataFile.set(6, file.fileSizeInBytes());
+ // 7: column_sizes - null default
+ dataFile.set(8, valueCounts(file.contentStats()));
+ dataFile.set(9, nullValueCounts(file.contentStats()));
+ dataFile.set(10, nanValueCounts(file.contentStats()));
+ dataFile.set(11, lowerBounds(file.contentStats()));
+ dataFile.set(12, upperBounds(file.contentStats()));
+ dataFile.set(13, file.keyMetadata());
+ dataFile.set(14, file.splitOffsets());
+ // 15: equality_ids - null default
+ dataFile.set(16, file.sortOrderId());
+ dataFile.set(17, tracking != null ? tracking.firstRowId() : null);
+
+ return dataFile;
+ }
+
+ static DataFile asDataFile(TrackedFile file, PartitionSpec spec) {
+ return asDataFile(file, spec, null);
+ }
+
+ static DataFile asDataFile(TrackedFile file, PartitionSpec spec, String tableLocation) {
+ Preconditions.checkState(
+ file.contentType() == FileContent.DATA,
+ "Cannot convert tracked file to DataFile: content type is %s, not DATA",
+ file.contentType());
+ return new TrackedDataFile(file, spec, tableLocation);
+ }
+
+ static DeleteFile asDVDeleteFile(TrackedFile file, PartitionSpec spec) {
+ return asDVDeleteFile(file, spec, null);
+ }
+
+ static DeleteFile asDVDeleteFile(TrackedFile file, PartitionSpec spec, String tableLocation) {
+ Preconditions.checkState(
+ file.contentType() == FileContent.DATA,
+ "Cannot extract DV from tracked file: content type is %s, not DATA",
+ file.contentType());
+ Preconditions.checkState(
+ file.deletionVector() != null, "Cannot extract DV from tracked file: no deletion vector");
+ return new TrackedDVDeleteFile(file, spec, tableLocation);
+ }
+
+ static DeleteFile asEqualityDeleteFile(TrackedFile file, PartitionSpec spec) {
+ return asEqualityDeleteFile(file, spec, null);
+ }
+
+ static DeleteFile asEqualityDeleteFile(
+ TrackedFile file, PartitionSpec spec, String tableLocation) {
+ Preconditions.checkState(
+ file.contentType() == FileContent.EQUALITY_DELETES,
+ "Cannot convert tracked file to DeleteFile: content type is %s, not EQUALITY_DELETES",
+ file.contentType());
+ return new TrackedDeleteFile(file, spec, tableLocation);
+ }
+
+ // TODO: TrackedFile will likely get an explicit partition tuple field (using a union partition
+ // schema), replacing this transform-based derivation. Once that lands, this method should be
+ // removed and the adapter should read the tuple directly.
+ //
+ // This derives partition values by applying the partition transform to the lower bound of the
+ // source column stats. This is correct because each data file belongs to exactly one partition,
+ // so lower == upper for partition source columns. For non-identity transforms (bucket, truncate),
+ // the transform of the lower bound produces the correct partition value under this invariant.
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ static StructLike extractPartition(TrackedFile file, PartitionSpec spec) {
+ if (spec == null || spec.isUnpartitioned()) {
+ return BaseFile.EMPTY_PARTITION_DATA;
+ }
+
+ ContentStats stats = file.contentStats();
+ if (stats == null) {
+ return new PartitionData(spec.partitionType());
+ }
+
+ PartitionData partition = new PartitionData(spec.partitionType());
+
+ for (int i = 0; i < spec.fields().size(); i += 1) {
+ PartitionField field = spec.fields().get(i);
+
+ if (field.transform().isVoid()) {
+ partition.set(i, null);
+ continue;
+ }
+
+ FieldStats> fieldStats = stats.statsFor(field.sourceId());
+ if (fieldStats == null || fieldStats.lowerBound() == null) {
+ partition.set(i, null);
+ continue;
+ }
+
+ Type sourceType = spec.schema().findType(field.sourceId());
+ Function boundTransform = field.transform().bind(sourceType);
+ partition.set(i, boundTransform.apply(fieldStats.lowerBound()));
+ }
+
+ return partition;
+ }
+
+ static Map valueCounts(ContentStats stats) {
+ if (stats == null) {
+ return null;
+ }
+
+ Map result = Maps.newHashMap();
+ for (FieldStats> fs : stats.fieldStats()) {
+ if (fs != null && fs.valueCount() != null) {
+ result.put(fs.fieldId(), fs.valueCount());
+ }
+ }
+
+ return result.isEmpty() ? null : result;
+ }
+
+ static Map nullValueCounts(ContentStats stats) {
+ if (stats == null) {
+ return null;
+ }
+
+ Map result = Maps.newHashMap();
+ for (FieldStats> fs : stats.fieldStats()) {
+ if (fs != null && fs.nullValueCount() != null) {
+ result.put(fs.fieldId(), fs.nullValueCount());
+ }
+ }
+
+ return result.isEmpty() ? null : result;
+ }
+
+ static Map nanValueCounts(ContentStats stats) {
+ if (stats == null) {
+ return null;
+ }
+
+ Map result = Maps.newHashMap();
+ for (FieldStats> fs : stats.fieldStats()) {
+ if (fs != null && fs.nanValueCount() != null) {
+ result.put(fs.fieldId(), fs.nanValueCount());
+ }
+ }
+
+ return result.isEmpty() ? null : result;
+ }
+
+ static Map lowerBounds(ContentStats stats) {
+ if (stats == null) {
+ return null;
+ }
+
+ Map result = Maps.newHashMap();
+ for (FieldStats> fs : stats.fieldStats()) {
+ if (fs != null && fs.lowerBound() != null && fs.type() != null) {
+ result.put(fs.fieldId(), Conversions.toByteBuffer(fs.type(), fs.lowerBound()));
+ }
+ }
+
+ return result.isEmpty() ? null : result;
+ }
+
+ static Map upperBounds(ContentStats stats) {
+ if (stats == null) {
+ return null;
+ }
+
+ Map result = Maps.newHashMap();
+ for (FieldStats> fs : stats.fieldStats()) {
+ if (fs != null && fs.upperBound() != null && fs.type() != null) {
+ result.put(fs.fieldId(), Conversions.toByteBuffer(fs.type(), fs.upperBound()));
+ }
+ }
+
+ return result.isEmpty() ? null : result;
+ }
+
+ /** Adapts a TrackedFile DATA entry to the {@link DataFile} interface. */
+ private static class TrackedDataFile implements DataFile, StructLike, java.io.Serializable {
+ // BaseFile StructLike field count (content through fileOrdinal)
+ private static final int STRUCT_SIZE = 22;
+
+ private final TrackedFile file;
+ private final Tracking tracking;
+ private final PartitionSpec spec;
+ private final String tableLocation;
+
+ private TrackedDataFile(TrackedFile file, PartitionSpec spec, String tableLocation) {
+ this.file = file;
+ this.tracking = file.tracking();
+ this.spec = spec;
+ this.tableLocation = tableLocation;
+ }
+
+ @Override
+ public int size() {
+ return STRUCT_SIZE;
+ }
+
+ @Override
+ public void set(int pos, T value) {
+ throw new UnsupportedOperationException("TrackedDataFile is read-only");
+ }
+
+ @Override
+ public T get(int pos, Class javaClass) {
+ return javaClass.cast(getByPos(pos));
+ }
+
+ // positions match BaseFile / DataFile.getType() field order
+ private Object getByPos(int pos) {
+ switch (pos) {
+ case 0:
+ return content().id();
+ case 1:
+ return location();
+ case 2:
+ return format() != null ? format().toString() : null;
+ case 3:
+ return specId();
+ case 4:
+ return partition();
+ case 5:
+ return recordCount();
+ case 6:
+ return fileSizeInBytes();
+ case 7:
+ return columnSizes();
+ case 8:
+ return valueCounts();
+ case 9:
+ return nullValueCounts();
+ case 10:
+ return nanValueCounts();
+ case 11:
+ return lowerBounds();
+ case 12:
+ return upperBounds();
+ case 13:
+ return keyMetadata();
+ case 14:
+ return splitOffsets();
+ case 15:
+ return equalityFieldIds();
+ case 16:
+ return sortOrderId();
+ case 17:
+ return firstRowId();
+ case 18:
+ return null; // referencedDataFile
+ case 19:
+ return null; // contentOffset
+ case 20:
+ return null; // contentSizeInBytes
+ case 21:
+ return pos();
+ default:
+ throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
+ }
+ }
+
+ @Override
+ public Long pos() {
+ return tracking != null ? tracking.manifestPos() : null;
+ }
+
+ @Override
+ public int specId() {
+ // null specId in v4 means unpartitioned; default to 0 to match PartitionSpec.unpartitioned()
+ return file.specId() != null ? file.specId() : 0;
+ }
+
+ @Override
+ public FileContent content() {
+ return FileContent.DATA;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public CharSequence path() {
+ return LocationUtil.resolve(file.location(), tableLocation);
+ }
+
+ @Override
+ public FileFormat format() {
+ return file.fileFormat();
+ }
+
+ @Override
+ public StructLike partition() {
+ return extractPartition(file, spec);
+ }
+
+ @Override
+ public long recordCount() {
+ return file.recordCount();
+ }
+
+ @Override
+ public long fileSizeInBytes() {
+ return file.fileSizeInBytes();
+ }
+
+ @Override
+ public Integer sortOrderId() {
+ return file.sortOrderId();
+ }
+
+ @Override
+ public Long dataSequenceNumber() {
+ return tracking != null ? tracking.dataSequenceNumber() : null;
+ }
+
+ @Override
+ public Long fileSequenceNumber() {
+ return tracking != null ? tracking.fileSequenceNumber() : null;
+ }
+
+ @Override
+ public Long firstRowId() {
+ return tracking != null ? tracking.firstRowId() : null;
+ }
+
+ @Override
+ public ByteBuffer keyMetadata() {
+ return file.keyMetadata();
+ }
+
+ @Override
+ public List splitOffsets() {
+ return file.splitOffsets();
+ }
+
+ @Override
+ public List equalityFieldIds() {
+ return null;
+ }
+
+ @Override
+ public String manifestLocation() {
+ return tracking != null ? tracking.manifestLocation() : null;
+ }
+
+ @Override
+ public Map columnSizes() {
+ return null;
+ }
+
+ @Override
+ public Map valueCounts() {
+ return TrackedFileAdapters.valueCounts(file.contentStats());
+ }
+
+ @Override
+ public Map nullValueCounts() {
+ return TrackedFileAdapters.nullValueCounts(file.contentStats());
+ }
+
+ @Override
+ public Map nanValueCounts() {
+ return TrackedFileAdapters.nanValueCounts(file.contentStats());
+ }
+
+ @Override
+ public Map lowerBounds() {
+ return TrackedFileAdapters.lowerBounds(file.contentStats());
+ }
+
+ @Override
+ public Map upperBounds() {
+ return TrackedFileAdapters.upperBounds(file.contentStats());
+ }
+
+ @Override
+ public DataFile copy() {
+ return new TrackedDataFile(file.copy(), spec, tableLocation);
+ }
+
+ @Override
+ public DataFile copy(boolean withStats) {
+ return withStats ? copy() : copyWithoutStats();
+ }
+
+ @Override
+ public DataFile copyWithoutStats() {
+ return new TrackedDataFile(file.copyWithoutStats(), spec, tableLocation);
+ }
+
+ @Override
+ public DataFile copyWithStats(Set requestedColumnIds) {
+ return new TrackedDataFile(file.copyWithStats(requestedColumnIds), spec, tableLocation);
+ }
+ }
+
+ /** Adapts a TrackedFile EQUALITY_DELETES entry to the {@link DeleteFile} interface. */
+ private static class TrackedDeleteFile implements DeleteFile, java.io.Serializable {
+ private final TrackedFile file;
+ private final Tracking tracking;
+ private final PartitionSpec spec;
+ private final String tableLocation;
+
+ private TrackedDeleteFile(TrackedFile file, PartitionSpec spec, String tableLocation) {
+ this.file = file;
+ this.tracking = file.tracking();
+ this.spec = spec;
+ this.tableLocation = tableLocation;
+ }
+
+ @Override
+ public Long pos() {
+ return tracking != null ? tracking.manifestPos() : null;
+ }
+
+ @Override
+ public int specId() {
+ // null specId in v4 means unpartitioned; default to 0 to match PartitionSpec.unpartitioned()
+ return file.specId() != null ? file.specId() : 0;
+ }
+
+ @Override
+ public FileContent content() {
+ return file.contentType();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public CharSequence path() {
+ return LocationUtil.resolve(file.location(), tableLocation);
+ }
+
+ @Override
+ public FileFormat format() {
+ return file.fileFormat();
+ }
+
+ @Override
+ public StructLike partition() {
+ return extractPartition(file, spec);
+ }
+
+ @Override
+ public long recordCount() {
+ return file.recordCount();
+ }
+
+ @Override
+ public long fileSizeInBytes() {
+ return file.fileSizeInBytes();
+ }
+
+ @Override
+ public Integer sortOrderId() {
+ return file.sortOrderId();
+ }
+
+ @Override
+ public Long dataSequenceNumber() {
+ return tracking != null ? tracking.dataSequenceNumber() : null;
+ }
+
+ @Override
+ public Long fileSequenceNumber() {
+ return tracking != null ? tracking.fileSequenceNumber() : null;
+ }
+
+ @Override
+ public Long firstRowId() {
+ return tracking != null ? tracking.firstRowId() : null;
+ }
+
+ @Override
+ public ByteBuffer keyMetadata() {
+ return file.keyMetadata();
+ }
+
+ @Override
+ public List splitOffsets() {
+ return file.splitOffsets();
+ }
+
+ @Override
+ public List equalityFieldIds() {
+ return file.equalityIds();
+ }
+
+ @Override
+ public String manifestLocation() {
+ return tracking != null ? tracking.manifestLocation() : null;
+ }
+
+ @Override
+ public Map columnSizes() {
+ return null;
+ }
+
+ @Override
+ public Map valueCounts() {
+ return TrackedFileAdapters.valueCounts(file.contentStats());
+ }
+
+ @Override
+ public Map nullValueCounts() {
+ return TrackedFileAdapters.nullValueCounts(file.contentStats());
+ }
+
+ @Override
+ public Map nanValueCounts() {
+ return TrackedFileAdapters.nanValueCounts(file.contentStats());
+ }
+
+ @Override
+ public Map lowerBounds() {
+ return TrackedFileAdapters.lowerBounds(file.contentStats());
+ }
+
+ @Override
+ public Map upperBounds() {
+ return TrackedFileAdapters.upperBounds(file.contentStats());
+ }
+
+ @Override
+ public DeleteFile copy() {
+ return new TrackedDeleteFile(file.copy(), spec, tableLocation);
+ }
+
+ @Override
+ public DeleteFile copy(boolean withStats) {
+ return withStats ? copy() : copyWithoutStats();
+ }
+
+ @Override
+ public DeleteFile copyWithoutStats() {
+ return new TrackedDeleteFile(file.copyWithoutStats(), spec, tableLocation);
+ }
+
+ @Override
+ public DeleteFile copyWithStats(Set requestedColumnIds) {
+ return new TrackedDeleteFile(file.copyWithStats(requestedColumnIds), spec, tableLocation);
+ }
+ }
+
+ /**
+ * Adapts the deletion vector from a TrackedFile DATA entry to the {@link DeleteFile} interface.
+ *
+ * The DV blob metadata is mapped to the DeleteFile DV fields: {@link
+ * DeleteFile#referencedDataFile()} is the data file location, and {@link
+ * DeleteFile#contentOffset()} / {@link DeleteFile#contentSizeInBytes()} point to the blob within
+ * the Puffin file.
+ */
+ private static class TrackedDVDeleteFile implements DeleteFile {
+ private final TrackedFile file;
+ private final DeletionVector dv;
+ private final Tracking tracking;
+ private final PartitionSpec spec;
+ private final String tableLocation;
+
+ private TrackedDVDeleteFile(TrackedFile file, PartitionSpec spec, String tableLocation) {
+ this.file = file;
+ this.dv = file.deletionVector();
+ this.tracking = file.tracking();
+ this.spec = spec;
+ this.tableLocation = tableLocation;
+ }
+
+ @Override
+ public Long pos() {
+ return tracking != null ? tracking.manifestPos() : null;
+ }
+
+ @Override
+ public int specId() {
+ return file.specId() != null ? file.specId() : 0;
+ }
+
+ @Override
+ public FileContent content() {
+ return FileContent.POSITION_DELETES;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public CharSequence path() {
+ return LocationUtil.resolve(dv.location(), tableLocation);
+ }
+
+ @Override
+ public FileFormat format() {
+ return FileFormat.PUFFIN;
+ }
+
+ @Override
+ public StructLike partition() {
+ return extractPartition(file, spec);
+ }
+
+ @Override
+ public long recordCount() {
+ return dv.cardinality();
+ }
+
+ // Returns the DV blob size, not the full Puffin file size. The DeletionVector metadata does not
+ // include the Puffin file size, so this is the best approximation available. Space accounting
+ // that sums fileSizeInBytes() was already imprecise in v3 (multiple DVs sharing a Puffin file
+ // each reported the full file size).
+ @Override
+ public long fileSizeInBytes() {
+ return dv.sizeInBytes();
+ }
+
+ @Override
+ public Integer sortOrderId() {
+ return null;
+ }
+
+ @Override
+ public Long dataSequenceNumber() {
+ return tracking != null ? tracking.dataSequenceNumber() : null;
+ }
+
+ @Override
+ public Long fileSequenceNumber() {
+ return tracking != null ? tracking.fileSequenceNumber() : null;
+ }
+
+ @Override
+ public Long firstRowId() {
+ return null;
+ }
+
+ @Override
+ public ByteBuffer keyMetadata() {
+ return null;
+ }
+
+ @Override
+ public List splitOffsets() {
+ return null;
+ }
+
+ @Override
+ public List equalityFieldIds() {
+ return null;
+ }
+
+ @Override
+ public String referencedDataFile() {
+ return LocationUtil.resolve(file.location(), tableLocation);
+ }
+
+ @Override
+ public Long contentOffset() {
+ return dv.offset();
+ }
+
+ @Override
+ public Long contentSizeInBytes() {
+ return dv.sizeInBytes();
+ }
+
+ @Override
+ public String manifestLocation() {
+ return tracking != null ? tracking.manifestLocation() : null;
+ }
+
+ @Override
+ public Map columnSizes() {
+ return null;
+ }
+
+ @Override
+ public Map valueCounts() {
+ return null;
+ }
+
+ @Override
+ public Map nullValueCounts() {
+ return null;
+ }
+
+ @Override
+ public Map nanValueCounts() {
+ return null;
+ }
+
+ @Override
+ public Map lowerBounds() {
+ return null;
+ }
+
+ @Override
+ public Map upperBounds() {
+ return null;
+ }
+
+ @Override
+ public DeleteFile copy() {
+ return new TrackedDVDeleteFile(file.copy(), spec, tableLocation);
+ }
+
+ @Override
+ public DeleteFile copy(boolean withStats) {
+ return copy();
+ }
+
+ @Override
+ public DeleteFile copyWithoutStats() {
+ return copy();
+ }
+
+ @Override
+ public DeleteFile copyWithStats(Set requestedColumnIds) {
+ return copy();
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/TrackedFileEntryAdapter.java b/core/src/main/java/org/apache/iceberg/TrackedFileEntryAdapter.java
new file mode 100644
index 000000000000..09908141ea7b
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/TrackedFileEntryAdapter.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/**
+ * Adapts a {@link TrackedFile} to the {@link ManifestEntry} interface for v3 pipeline
+ * compatibility.
+ *
+ * This allows code that works with ManifestEntry (ManifestFiles.read(), ManifestGroup, etc.) to
+ * consume entries from v4 manifests via {@link V4ManifestReader}.
+ */
+class TrackedFileEntryAdapter> implements ManifestEntry {
+ private final TrackedFile trackedFile;
+ private final F adapted;
+ private final PartitionSpec spec;
+
+ // mutable fields for InheritableMetadata
+ private Long overrideSnapshotId = null;
+ private Long overrideDataSeqNum = null;
+ private Long overrideFileSeqNum = null;
+ private boolean snapshotIdOverridden = false;
+ private boolean dataSeqNumOverridden = false;
+ private boolean fileSeqNumOverridden = false;
+
+ @SuppressWarnings("unchecked")
+ TrackedFileEntryAdapter(TrackedFile trackedFile, PartitionSpec spec) {
+ this(trackedFile, spec, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ TrackedFileEntryAdapter(TrackedFile trackedFile, PartitionSpec spec, String tableLocation) {
+ this.trackedFile = trackedFile;
+ this.spec = spec;
+ this.adapted = (F) adaptFile(trackedFile, spec, tableLocation);
+ }
+
+ private static ContentFile> adaptFile(
+ TrackedFile file, PartitionSpec spec, String tableLocation) {
+ if (file.contentType() == FileContent.DATA) {
+ return TrackedFileAdapters.asDataFile(file, spec, tableLocation);
+ }
+
+ if (file.contentType() == FileContent.EQUALITY_DELETES) {
+ return TrackedFileAdapters.asEqualityDeleteFile(file, spec, tableLocation);
+ }
+
+ // DATA entries with a deletion vector are adapted as DV delete files
+ return TrackedFileAdapters.asDVDeleteFile(file, spec, tableLocation);
+ }
+
+ @Override
+ public Status status() {
+ Tracking tracking = trackedFile.tracking();
+ if (tracking == null) {
+ return Status.EXISTING;
+ }
+
+ EntryStatus entryStatus = tracking.status();
+ if (entryStatus == null) {
+ return Status.EXISTING;
+ }
+
+ switch (entryStatus) {
+ case EXISTING:
+ return Status.EXISTING;
+ case ADDED:
+ return Status.ADDED;
+ case DELETED:
+ case REPLACED:
+ return Status.DELETED;
+ default:
+ throw new UnsupportedOperationException("Unknown entry status: " + entryStatus);
+ }
+ }
+
+ @Override
+ public Long snapshotId() {
+ if (snapshotIdOverridden) {
+ return overrideSnapshotId;
+ }
+
+ return trackedFile.tracking() != null ? trackedFile.tracking().snapshotId() : null;
+ }
+
+ @Override
+ public void setSnapshotId(long snapshotId) {
+ this.overrideSnapshotId = snapshotId;
+ this.snapshotIdOverridden = true;
+ }
+
+ @Override
+ public Long dataSequenceNumber() {
+ if (dataSeqNumOverridden) {
+ return overrideDataSeqNum;
+ }
+
+ return trackedFile.tracking() != null ? trackedFile.tracking().dataSequenceNumber() : null;
+ }
+
+ @Override
+ public void setDataSequenceNumber(long dataSequenceNumber) {
+ this.overrideDataSeqNum = dataSequenceNumber;
+ this.dataSeqNumOverridden = true;
+ }
+
+ @Override
+ public Long fileSequenceNumber() {
+ if (fileSeqNumOverridden) {
+ return overrideFileSeqNum;
+ }
+
+ return trackedFile.tracking() != null ? trackedFile.tracking().fileSequenceNumber() : null;
+ }
+
+ @Override
+ public void setFileSequenceNumber(long fileSequenceNumber) {
+ this.overrideFileSeqNum = fileSequenceNumber;
+ this.fileSeqNumOverridden = true;
+ }
+
+ @Override
+ public F file() {
+ return adapted;
+ }
+
+ @Override
+ public ManifestEntry copy() {
+ return new TrackedFileEntryAdapter<>(trackedFile.copy(), spec);
+ }
+
+ @Override
+ public ManifestEntry copyWithoutStats() {
+ return new TrackedFileEntryAdapter<>(trackedFile.copyWithoutStats(), spec);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/TrackingStruct.java b/core/src/main/java/org/apache/iceberg/TrackingStruct.java
index a8624aad15c1..03215d3cbc8d 100644
--- a/core/src/main/java/org/apache/iceberg/TrackingStruct.java
+++ b/core/src/main/java/org/apache/iceberg/TrackingStruct.java
@@ -113,6 +113,10 @@ void setManifestLocation(String location) {
this.manifestLocation = location;
}
+ void setManifestPos(long pos) {
+ this.manifestPos = pos;
+ }
+
@Override
public EntryStatus status() {
return status;
diff --git a/core/src/main/java/org/apache/iceberg/V4ManifestReader.java b/core/src/main/java/org/apache/iceberg/V4ManifestReader.java
new file mode 100644
index 000000000000..c45ee599f743
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/V4ManifestReader.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Reader for v4 manifest files containing {@link TrackedFile} entries.
+ *
+ * Supports reading both root manifests and leaf manifests. Returns TrackedFile entries which can
+ * represent data files, equality deletes, or manifest references.
+ */
+class V4ManifestReader extends CloseableGroup implements CloseableIterable {
+ private final InputFile file;
+ private final Map specsById;
+
+ V4ManifestReader(InputFile file, Map specsById) {
+ this.file = file;
+ this.specsById = specsById;
+ }
+
+ /** Returns all entries in the manifest. */
+ CloseableIterable entries() {
+ return open();
+ }
+
+ /** Returns only live entries (ADDED or EXISTING, not DELETED or REPLACED). */
+ CloseableIterable liveEntries() {
+ return CloseableIterable.filter(open(), this::isLive);
+ }
+
+ /** Returns copied live entries for safe use outside iteration. */
+ @Override
+ public CloseableIterator iterator() {
+ return CloseableIterable.transform(liveEntries(), TrackedFile::copy).iterator();
+ }
+
+ Map specsById() {
+ return specsById;
+ }
+
+ private boolean isLive(TrackedFile tf) {
+ if (tf == null) {
+ return false;
+ }
+
+ Tracking tracking = tf.tracking();
+ return tracking != null && tracking.isLive();
+ }
+
+ private CloseableIterable open() {
+ FileFormat format = FileFormat.fromFileName(file.location());
+ Preconditions.checkArgument(
+ format != null, "Unable to determine format of manifest: %s", file.location());
+
+ // Hack: Exclude SPLIT_OFFSETS and EQUALITY_IDS from read projection to tolerate
+ // manifests that don't write list element field IDs
+ // TODO: Fix it
+ Schema fullSchema = V4Metadata.entrySchema(Types.StructType.of());
+ Schema readSchema =
+ new Schema(
+ fullSchema.columns().stream()
+ .filter(
+ f ->
+ f.fieldId() != TrackedFile.SPLIT_OFFSETS.fieldId()
+ && f.fieldId() != TrackedFile.EQUALITY_IDS.fieldId())
+ .collect(java.util.stream.Collectors.toList()));
+
+ CloseableIterable reader =
+ InternalData.read(format, file)
+ .project(readSchema)
+ .setRootType(TrackedFileStruct.class)
+ .setCustomType(TrackedFile.TRACKING.fieldId(), TrackingStruct.class)
+ .setCustomType(TrackedFile.DELETION_VECTOR.fieldId(), DeletionVectorStruct.class)
+ .setCustomType(TrackedFile.MANIFEST_INFO.fieldId(), ManifestInfoStruct.class)
+ .reuseContainers()
+ .build();
+
+ addCloseable(reader);
+ return reader;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/V4Metadata.java b/core/src/main/java/org/apache/iceberg/V4Metadata.java
index 67478290aa10..b5c58740b165 100644
--- a/core/src/main/java/org/apache/iceberg/V4Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V4Metadata.java
@@ -18,12 +18,11 @@
*/
package org.apache.iceberg;
-import static org.apache.iceberg.types.Types.NestedField.required;
-
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.LocationUtil;
class V4Metadata {
private V4Metadata() {}
@@ -263,110 +262,295 @@ public ManifestFile copy() {
}
}
- static Schema entrySchema(Types.StructType partitionType) {
- return wrapFileSchema(fileType(partitionType));
+ private static final Types.StructType ROOT_MANIFEST_WRITE_TYPE =
+ entrySchema(Types.StructType.of()).asStruct();
+
+ /**
+ * Converts a {@link ManifestFile} to a {@link TrackedFileStruct} for writing into a root
+ * manifest.
+ *
+ * The returned struct uses the entry schema projection so that field positions match the write
+ * schema (which excludes content_stats).
+ */
+ static TrackedFileStruct manifestFileToTrackedFile(
+ ManifestFile manifest,
+ long commitSnapshotId,
+ long commitSequenceNumber,
+ String tableLocation) {
+ long seqNum = resolveSeqNum(manifest.sequenceNumber(), commitSequenceNumber);
+ long minSeqNum = resolveSeqNum(manifest.minSequenceNumber(), commitSequenceNumber);
+
+ TrackingStruct tracking = buildTracking(manifest, commitSnapshotId, seqNum);
+ ManifestInfoStruct info = buildManifestInfo(manifest, minSeqNum);
+
+ FileContent contentType =
+ manifest.content() == ManifestContent.DATA
+ ? FileContent.DATA_MANIFEST
+ : FileContent.DELETE_MANIFEST;
+
+ int totalEntries =
+ intOrZero(manifest.addedFilesCount())
+ + intOrZero(manifest.existingFilesCount())
+ + intOrZero(manifest.deletedFilesCount());
+
+ // use the entry schema as projection so positions match the write schema
+ TrackedFileStruct tf = new TrackedFileStruct(ROOT_MANIFEST_WRITE_TYPE);
+ tf.set(0, tracking);
+ tf.set(1, contentType.id());
+ tf.set(2, LocationUtil.relativize(manifest.path(), tableLocation));
+ tf.set(3, FileFormat.PARQUET.toString());
+ tf.set(4, (long) totalEntries);
+ tf.set(5, manifest.length());
+ tf.set(6, manifest.partitionSpecId());
+ tf.set(9, info);
+
+ if (manifest.keyMetadata() != null) {
+ tf.set(10, manifest.keyMetadata());
+ }
+
+ return tf;
+ }
+
+ /**
+ * Re-projects a data/delete file {@link TrackedFile} entry for writing into a root manifest.
+ *
+ *
Entries read from a flat root manifest may carry content_stats with a schema that differs
+ * from the root write schema. This method creates a clean {@link TrackedFileStruct} using the
+ * root manifest projection, copying only the standard fields.
+ */
+ static TrackedFileStruct dataEntryForRootManifest(TrackedFile tf) {
+ // ROOT_MANIFEST_WRITE_TYPE uses entrySchema which excludes content_stats.
+ // Positions match entrySchema field order:
+ // 0=tracking, 1=content_type, 2=location, 3=file_format, 4=record_count,
+ // 5=file_size_in_bytes, 6=spec_id, 7=sort_order_id, 8=deletion_vector,
+ // 9=manifest_info, 10=key_metadata, 11=split_offsets, 12=equality_ids
+ TrackedFileStruct out = new TrackedFileStruct(ROOT_MANIFEST_WRITE_TYPE);
+ out.set(0, tf.tracking());
+ out.set(1, tf.contentType().id());
+ out.set(2, tf.location());
+ out.set(3, tf.fileFormat().toString());
+ out.set(4, tf.recordCount());
+ out.set(5, tf.fileSizeInBytes());
+ if (tf.specId() != null) {
+ out.set(6, tf.specId());
+ }
+ if (tf.sortOrderId() != null) {
+ out.set(7, tf.sortOrderId());
+ }
+ if (tf.deletionVector() != null) {
+ out.set(8, tf.deletionVector());
+ }
+ if (tf.keyMetadata() != null) {
+ out.set(10, tf.keyMetadata());
+ }
+ if (tf.splitOffsets() != null) {
+ out.set(11, tf.splitOffsets());
+ }
+ if (tf.equalityIds() != null) {
+ out.set(12, tf.equalityIds());
+ }
+ return out;
+ }
+
+ /** Converts a {@link TrackedFile} read from a root manifest back to a {@link ManifestFile}. */
+ static ManifestFile trackedFileToManifestFile(TrackedFile tf, String tableLocation) {
+ ManifestInfo info = tf.manifestInfo();
+ Tracking tracking = tf.tracking();
+ ManifestContent content =
+ tf.contentType() == FileContent.DATA_MANIFEST
+ ? ManifestContent.DATA
+ : ManifestContent.DELETES;
+
+ return new GenericManifestFile(
+ LocationUtil.resolve(tf.location(), tableLocation),
+ tf.fileSizeInBytes(),
+ tf.specId() != null ? tf.specId() : 0,
+ content,
+ sequenceNumberFrom(tracking),
+ info != null ? info.minSequenceNumber() : 0L,
+ tracking != null ? tracking.snapshotId() : null,
+ null,
+ tf.keyMetadata(),
+ info != null ? info.addedFilesCount() : 0,
+ info != null ? info.addedRowsCount() : 0L,
+ info != null ? info.existingFilesCount() : 0,
+ info != null ? info.existingRowsCount() : 0L,
+ info != null ? info.deletedFilesCount() : 0,
+ info != null ? info.deletedRowsCount() : 0L,
+ null);
+ }
+
+ private static long resolveSeqNum(long seqNum, long commitSequenceNumber) {
+ return seqNum == ManifestWriter.UNASSIGNED_SEQ ? commitSequenceNumber : seqNum;
+ }
+
+ private static TrackingStruct buildTracking(
+ ManifestFile manifest, long commitSnapshotId, long seqNum) {
+ TrackingStruct tracking = new TrackingStruct();
+ tracking.set(0, EntryStatus.ADDED.id());
+ tracking.set(1, manifest.snapshotId() != null ? manifest.snapshotId() : commitSnapshotId);
+ tracking.set(2, seqNum);
+ tracking.set(3, seqNum);
+ return tracking;
}
- static Schema wrapFileSchema(Types.StructType fileSchema) {
- // this is used to build projection schemas
+ private static ManifestInfoStruct buildManifestInfo(ManifestFile manifest, long minSeqNum) {
+ ManifestInfoStruct info = new ManifestInfoStruct();
+ info.set(0, intOrZero(manifest.addedFilesCount()));
+ info.set(1, intOrZero(manifest.existingFilesCount()));
+ info.set(2, intOrZero(manifest.deletedFilesCount()));
+ info.set(3, 0);
+ info.set(4, longOrZero(manifest.addedRowsCount()));
+ info.set(5, longOrZero(manifest.existingRowsCount()));
+ info.set(6, longOrZero(manifest.deletedRowsCount()));
+ info.set(7, 0L);
+ info.set(8, minSeqNum);
+ return info;
+ }
+
+ private static int intOrZero(Integer value) {
+ return value != null ? value : 0;
+ }
+
+ private static long longOrZero(Long value) {
+ return value != null ? value : 0L;
+ }
+
+ private static long sequenceNumberFrom(Tracking tracking) {
+ if (tracking != null && tracking.dataSequenceNumber() != null) {
+ return tracking.dataSequenceNumber();
+ }
+
+ return 0L;
+ }
+
+ private static final Types.StructType ENTRY_WRITE_TYPE =
+ entrySchema(Types.StructType.of()).asStruct();
+
+ static Schema entrySchema(Types.StructType partitionType) {
return new Schema(
- ManifestEntry.STATUS,
- ManifestEntry.SNAPSHOT_ID,
- ManifestEntry.SEQUENCE_NUMBER,
- ManifestEntry.FILE_SEQUENCE_NUMBER,
- required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema));
+ TrackedFile.TRACKING,
+ TrackedFile.CONTENT_TYPE,
+ TrackedFile.LOCATION,
+ TrackedFile.FILE_FORMAT,
+ TrackedFile.RECORD_COUNT,
+ TrackedFile.FILE_SIZE_IN_BYTES,
+ TrackedFile.SPEC_ID,
+ TrackedFile.SORT_ORDER_ID,
+ TrackedFile.DELETION_VECTOR,
+ TrackedFile.MANIFEST_INFO,
+ TrackedFile.KEY_METADATA,
+ TrackedFile.SPLIT_OFFSETS,
+ TrackedFile.EQUALITY_IDS);
+ }
+
+ /**
+ * Converts a {@link ManifestEntry} to a {@link TrackedFileStruct} for writing into a v4 data or
+ * delete manifest.
+ *
+ *
The returned struct uses the entry schema projection so that field positions match the write
+ * schema (which excludes content_stats).
+ */
+ static TrackedFileStruct entryToTrackedFile(
+ ManifestEntry> entry, Long commitSnapshotId, String tableLocation) {
+ TrackingStruct tracking = buildEntryTracking(entry, commitSnapshotId);
+ ContentFile> file = entry.file();
+
+ TrackedFileStruct tf = new TrackedFileStruct(ENTRY_WRITE_TYPE);
+ tf.set(0, tracking);
+ tf.set(1, file.content().id());
+ tf.set(2, LocationUtil.relativize(file.location(), tableLocation));
+ tf.set(3, file.format() != null ? file.format().toString() : null);
+ tf.set(4, file.recordCount());
+ tf.set(5, file.fileSizeInBytes());
+ tf.set(6, file.specId());
+ tf.set(7, file.sortOrderId());
+ // positions 8 (deletion_vector) and 9 (manifest_info) default to null
+
+ if (file.keyMetadata() != null) {
+ tf.set(10, file.keyMetadata());
+ }
+
+ if (file.splitOffsets() != null) {
+ tf.set(11, file.splitOffsets());
+ }
+
+ if (file.equalityFieldIds() != null) {
+ tf.set(12, file.equalityFieldIds());
+ }
+
+ return tf;
}
- static Types.StructType fileType(Types.StructType partitionType) {
- return Types.StructType.of(
- DataFile.CONTENT.asRequired(),
- DataFile.FILE_PATH,
- DataFile.FILE_FORMAT,
- required(
- DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType, DataFile.PARTITION_DOC),
- DataFile.RECORD_COUNT,
- DataFile.FILE_SIZE,
- DataFile.COLUMN_SIZES,
- DataFile.VALUE_COUNTS,
- DataFile.NULL_VALUE_COUNTS,
- DataFile.NAN_VALUE_COUNTS,
- DataFile.LOWER_BOUNDS,
- DataFile.UPPER_BOUNDS,
- DataFile.KEY_METADATA,
- DataFile.SPLIT_OFFSETS,
- DataFile.EQUALITY_IDS,
- DataFile.SORT_ORDER_ID,
- DataFile.FIRST_ROW_ID,
- DataFile.REFERENCED_DATA_FILE,
- DataFile.CONTENT_OFFSET,
- DataFile.CONTENT_SIZE);
+ private static TrackingStruct buildEntryTracking(ManifestEntry> entry, Long commitSnapshotId) {
+ TrackingStruct tracking = new TrackingStruct();
+ tracking.set(0, entry.status().id());
+ tracking.set(1, entry.snapshotId());
+
+ if (entry.dataSequenceNumber() == null) {
+ Preconditions.checkState(
+ entry.snapshotId() == null || entry.snapshotId().equals(commitSnapshotId),
+ "Found unassigned sequence number for an entry from snapshot: %s",
+ entry.snapshotId());
+ Preconditions.checkState(
+ entry.status() == ManifestEntry.Status.ADDED,
+ "Only entries with status ADDED can have null sequence number");
+ // leave sequence number as null for ADDED entries (assigned at commit)
+ } else {
+ tracking.set(2, entry.dataSequenceNumber());
+ }
+
+ if (entry.fileSequenceNumber() != null) {
+ tracking.set(3, entry.fileSequenceNumber());
+ }
+
+ if (entry.file().content() == FileContent.DATA && entry.file().firstRowId() != null) {
+ tracking.set(5, entry.file().firstRowId());
+ }
+
+ return tracking;
}
+ /**
+ * Wraps a {@link ManifestEntry} for v4 manifest writing.
+ *
+ *
Implements {@link ManifestEntry} for type compatibility with {@link ManifestWriter}, and
+ * delegates {@link StructLike} to an internal {@link TrackedFileStruct} built via {@link
+ * #entryToTrackedFile}.
+ */
static class ManifestEntryWrapper>
implements ManifestEntry, StructLike {
- private final int size;
+
private final Long commitSnapshotId;
- private final DataFileWrapper> fileWrapper;
+ private final String tableLocation;
private ManifestEntry wrapped = null;
+ private TrackedFileStruct tracked = null;
- ManifestEntryWrapper(Long commitSnapshotId) {
- this.size = entrySchema(Types.StructType.of()).columns().size();
+ ManifestEntryWrapper(
+ Long commitSnapshotId, Types.StructType partitionType, String tableLocation) {
this.commitSnapshotId = commitSnapshotId;
- this.fileWrapper = new DataFileWrapper<>();
+ this.tableLocation = tableLocation;
}
public ManifestEntryWrapper wrap(ManifestEntry entry) {
this.wrapped = entry;
+ this.tracked = entryToTrackedFile(entry, commitSnapshotId, tableLocation);
return this;
}
@Override
public int size() {
- return size;
+ return tracked.size();
}
@Override
public void set(int pos, T value) {
- throw new UnsupportedOperationException("Cannot modify ManifestEntryWrapper wrapper via set");
+ throw new UnsupportedOperationException("ManifestEntryWrapper is read-only");
}
@Override
public T get(int pos, Class