Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@
<td>Boolean</td>
<td>Whether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided.</td>
</tr>
<tr>
<td><h5>deletion-vectors.merge-on-read</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>When deletion vectors are enabled, uncompacted files are not visible by default. Set this to true to enable merge-on-read, which makes uncompacted data visible at the cost of read performance. This option only affects batch scan visibility of DV level-0 files, it does not change streaming scan or changelog behavior.</td>
</tr>
<tr>
<td><h5>deletion-vectors.modifiable</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
20 changes: 19 additions & 1 deletion paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1879,6 +1879,17 @@ public InlineElement getDescription() {
.defaultValue(MemorySize.ofMebiBytes(2))
.withDescription("The target size of deletion vector index file.");

public static final ConfigOption<Boolean> DELETION_VECTORS_MERGE_ON_READ =
key("deletion-vectors.merge-on-read")
.booleanType()
.defaultValue(false)
.withDescription(
"When deletion vectors are enabled, uncompacted files are not visible by default. "
+ "Set this to true to enable merge-on-read, which makes uncompacted data "
+ "visible at the cost of read performance. "
+ "This option only affects batch scan visibility of DV level-0 files, "
+ "it does not change streaming scan or changelog behavior.");

public static final ConfigOption<Boolean> DELETION_VECTOR_BITMAP64 =
key("deletion-vectors.bitmap64")
.booleanType()
Expand Down Expand Up @@ -3648,8 +3659,15 @@ public boolean forceLookup() {
return options.get(FORCE_LOOKUP);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: add a helper accessor method

For consistency with other boolean options (e.g., deletionVectorsEnabled(), visibilityCallbackEnabled()), consider adding a dedicated accessor method:

public boolean deletionVectorsMergeOnRead() {
    return options.get(DELETION_VECTORS_MERGE_ON_READ);
}

Then batchScanSkipLevel0() can call deletionVectorsMergeOnRead() instead of directly accessing the config, and SchemaValidation can also use options.deletionVectorsMergeOnRead() instead of options.toConfiguration().get(CoreOptions.DELETION_VECTORS_MERGE_ON_READ).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, added deletionVectorsMergeOnRead() and updated both callers.

}

public boolean deletionVectorsMergeOnRead() {
return options.get(DELETION_VECTORS_MERGE_ON_READ);
}

public boolean batchScanSkipLevel0() {
return deletionVectorsEnabled() || mergeEngine() == FIRST_ROW;
if (deletionVectorsEnabled()) {
return !deletionVectorsMergeOnRead();
}
return mergeEngine() == FIRST_ROW;
}

public MemorySize dvIndexFileTargetSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ public static void validateTableSchema(TableSchema schema) {

if (options.deletionVectorsEnabled()) {
validateForDeletionVectors(options);
} else {
checkArgument(
!options.deletionVectorsMergeOnRead(),
"deletion-vectors.merge-on-read requires deletion-vectors.enabled to be true.");
}

// vector field names must point to vector type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,4 +508,49 @@ public void testFileFormatPerLevelAcceptsCompatibleSchema() {
validateTableSchema(
new TableSchema(1, fields, 10, emptyList(), singletonList("k"), options, ""));
}

@Test
public void testMergeOnReadCoexistsWithVisibilityCallback() {
Map<String, String> options = new HashMap<>();
options.put("deletion-vectors.enabled", "true");
options.put("deletion-vectors.merge-on-read", "true");
options.put("visibility-callback.enabled", "true");
assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException();
}

@Test
public void testMergeOnReadCoexistsWithVisibilityCallbackAndPostponeBucket() {
List<DataField> fields =
Arrays.asList(
new DataField(0, "f0", DataTypes.INT()),
new DataField(1, "f1", DataTypes.INT()),
new DataField(2, "f2", DataTypes.INT()),
new DataField(3, "f3", DataTypes.STRING()));
Map<String, String> options = new HashMap<>();
options.put("deletion-vectors.enabled", "true");
options.put("deletion-vectors.merge-on-read", "true");
options.put("visibility-callback.enabled", "true");
options.put(BUCKET.key(), String.valueOf(-2));
assertThatCode(
() ->
validateTableSchema(
new TableSchema(
1,
fields,
10,
singletonList("f0"),
singletonList("f1"),
options,
"")))
.doesNotThrowAnyException();
}

@Test
public void testMergeOnReadRequiresDvEnabled() {
Map<String, String> options = new HashMap<>();
options.put("deletion-vectors.merge-on-read", "true");
assertThatThrownBy(() -> validateTableSchemaExec(options))
.hasMessageContaining(
"deletion-vectors.merge-on-read requires deletion-vectors.enabled to be true");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataTypes;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.util.ArrayList;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link CoreOptions#DELETION_VECTORS_MERGE_ON_READ}. */
public class DeletionVectorsMergeOnReadTest {

@TempDir java.nio.file.Path tempDir;

private Catalog catalog;
private final Identifier identifier = Identifier.create("my_db", "T");

@BeforeEach
public void before() throws Exception {
Options options = new Options();
options.set("warehouse", new Path(tempDir.toString() + "/warehouse").toUri().toString());
catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
catalog.createDatabase("my_db", true);
}

private FileStoreTable createTable(boolean mergeOnRead) throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("k", DataTypes.INT());
schemaBuilder.column("v", DataTypes.INT());
schemaBuilder.primaryKey("k");
schemaBuilder.option("bucket", "1");
schemaBuilder.option("deletion-vectors.enabled", "true");
schemaBuilder.option("write-only", "true");
if (mergeOnRead) {
schemaBuilder.option("deletion-vectors.merge-on-read", "true");
}
catalog.createTable(identifier, schemaBuilder.build(), true);
return (FileStoreTable) catalog.getTable(identifier);
}

private void writeCommit(FileStoreTable table, GenericRow... rows) throws Exception {
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
BatchTableWrite write = writeBuilder.newWrite();
for (GenericRow row : rows) {
write.write(row);
}
writeBuilder.newCommit().commit(write.prepareCommit());
write.close();
}

private List<GenericRow> query(FileStoreTable table) throws Exception {
ReadBuilder readBuilder = table.newReadBuilder();
TableScan.Plan plan = readBuilder.newScan().plan();
List<GenericRow> result = new ArrayList<>();
readBuilder
.newRead()
.createReader(plan)
.forEachRemaining(row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1))));
return result;
}

@Test
public void testDefaultSkipsLevel0() throws Exception {
FileStoreTable table = createTable(false);

writeCommit(table, GenericRow.of(1, 10), GenericRow.of(2, 20));
writeCommit(table, GenericRow.of(1, 11), GenericRow.of(3, 30));

// write-only mode, no compaction, all files at level 0
// default DV mode skips level 0 — only the first commit's compacted data is visible
// since no compaction has run, no data should be visible from level > 0
List<GenericRow> result = query(table);
assertThat(result).isEmpty();
}

@Test
public void testMergeOnReadReadsLevel0() throws Exception {
FileStoreTable table = createTable(true);

writeCommit(table, GenericRow.of(1, 10), GenericRow.of(2, 20));
writeCommit(table, GenericRow.of(1, 11), GenericRow.of(3, 30));

// merge-on-read enabled, level 0 data is visible via MOR
List<GenericRow> result = query(table);
assertThat(result)
.containsExactlyInAnyOrder(
GenericRow.of(1, 11), GenericRow.of(2, 20), GenericRow.of(3, 30));
}

@Test
public void testMergeOnReadWithQueryHint() throws Exception {
FileStoreTable table = createTable(false);

writeCommit(table, GenericRow.of(1, 10), GenericRow.of(2, 20));
writeCommit(table, GenericRow.of(1, 11), GenericRow.of(3, 30));

// default: no data visible (L0 skipped)
assertThat(query(table)).isEmpty();

// override with dynamic option to enable merge-on-read
table =
table.copy(
java.util.Collections.singletonMap(
"deletion-vectors.merge-on-read", "true"));
List<GenericRow> result = query(table);
assertThat(result)
.containsExactlyInAnyOrder(
GenericRow.of(1, 11), GenericRow.of(2, 20), GenericRow.of(3, 30));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,36 @@ public void testBatchReadDVTable(String changelogProducer, boolean dvBitmap64) {
Row.of(1, "111111111"), Row.of(2, "2_1"), Row.of(3, "3_1"), Row.of(4, "4"));
}

@ParameterizedTest
@MethodSource("parameters1")
public void testBatchReadDVTableWithMergeOnRead(String changelogProducer, boolean dvBitmap64) {
sql(
String.format(
"CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) "
+ "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', "
+ "'deletion-vectors.bitmap64' = '%s', 'write-only' = 'true', 'bucket' = '1')",
changelogProducer, dvBitmap64));

sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')");

sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistency with core test testDefaultSkipsLevel0

The core-level test DeletionVectorsMergeOnReadTest#testDefaultSkipsLevel0 asserts an empty result when reading without merge-on-read (all files at level 0, none visible). But this Flink IT test asserts the first commit's data is visible without merge-on-read.

This difference is likely because the core test explicitly sets bucket=1 (fixed bucket) while this Flink test doesn't (defaults to dynamic bucket mode, which may place initial files differently). Could be worth:

  1. Adding a brief comment explaining why the first commit's data is visible here (e.g., files from the first snapshot are at level > 0 in this config), or
  2. Aligning the two tests to use the same bucket configuration for consistency.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — added 'bucket' = '1' to align with the core test, and changed the assertion to isEmpty() for consistency.

sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')");

// write-only with fixed bucket, all files at level 0, not visible without merge-on-read
assertThat(batchSql("SELECT * FROM T")).isEmpty();

// with merge-on-read enabled, level 0 data becomes visible via MOR
assertThat(
batchSql(
"SELECT * FROM T /*+ OPTIONS('deletion-vectors.merge-on-read'='true') */"))
.containsExactlyInAnyOrder(
Row.of(1, "111111111"),
Row.of(2, "2_2"),
Row.of(3, "3_1"),
Row.of(4, "4_1"));
}

@ParameterizedTest
@MethodSource("parameters1")
public void testDVTableWithAggregationMergeEngine(String changelogProducer, boolean dvBitmap64)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ import scala.concurrent.duration.DurationInt

class VisibilityCallbackTest extends PaimonSparkTestBase {

Seq((true, false), (false, true)).foreach {
Seq((true, false), (false, true), (true, true)).foreach {
case (dv, postpone) =>
test(s"Visibility callback with deletion-vectors $dv and postpone-bucket $postpone") {
withTable("T") {
val bucket = if (postpone) -2 else 1
val mergeOnRead = if (dv) "true" else "false"
sql(s"""
|CREATE TABLE T (id INT, name STRING)
|TBLPROPERTIES (
| 'bucket' = '$bucket',
| 'primary-key' = 'id',
| 'deletion-vectors.enabled' = '$dv',
| 'deletion-vectors.merge-on-read' = '$mergeOnRead',
| 'write-only' = 'true'
|)
|""".stripMargin)
Expand Down
Loading