From 48316b072f12b80be278df73ac1c655d8c101d26 Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Mon, 25 May 2026 01:55:09 +0800 Subject: [PATCH 1/3] [core] Add deletion-vectors.merge-on-read config for DV table L0 visibility In shared dataset scenarios with dedicated compaction jobs, DV tables skip level 0 files by default. This adds a config option to allow reading uncompacted data via merge-on-read, trading read performance for data freshness. --- docs/generated/core_configuration.html | 6 + .../java/org/apache/paimon/CoreOptions.java | 16 +- .../paimon/schema/SchemaValidation.java | 7 + .../paimon/schema/SchemaValidationTest.java | 11 ++ .../table/DeletionVectorsMergeOnReadTest.java | 145 ++++++++++++++++++ .../paimon/flink/DeletionVectorITCase.java | 32 ++++ 6 files changed, 216 insertions(+), 1 deletion(-) create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/DeletionVectorsMergeOnReadTest.java diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index 5d608dba9ed1..481369245597 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -512,6 +512,12 @@ Boolean Whether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided. + +
deletion-vectors.merge-on-read
+ false + Boolean + When deletion vectors are enabled, uncompacted files are not visible by default. Set this to true to enable merge-on-read, which makes uncompacted data visible at the cost of read performance. This option only affects batch scan visibility of DV level-0 files, it does not change streaming scan or changelog behavior. +
deletion-vectors.modifiable
false diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 9de20d19cdb5..fecd9fa5c142 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1879,6 +1879,17 @@ public InlineElement getDescription() { .defaultValue(MemorySize.ofMebiBytes(2)) .withDescription("The target size of deletion vector index file."); + public static final ConfigOption DELETION_VECTORS_MERGE_ON_READ = + key("deletion-vectors.merge-on-read") + .booleanType() + .defaultValue(false) + .withDescription( + "When deletion vectors are enabled, uncompacted files are not visible by default. " + + "Set this to true to enable merge-on-read, which makes uncompacted data " + + "visible at the cost of read performance. " + + "This option only affects batch scan visibility of DV level-0 files, " + + "it does not change streaming scan or changelog behavior."); + public static final ConfigOption DELETION_VECTOR_BITMAP64 = key("deletion-vectors.bitmap64") .booleanType() @@ -3649,7 +3660,10 @@ public boolean forceLookup() { } public boolean batchScanSkipLevel0() { - return deletionVectorsEnabled() || mergeEngine() == FIRST_ROW; + if (deletionVectorsEnabled()) { + return !options.get(DELETION_VECTORS_MERGE_ON_READ); + } + return mergeEngine() == FIRST_ROW; } public MemorySize dvIndexFileTargetSize() { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index b703caaf9b6c..b0a7a762988f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -620,6 +620,13 @@ private static void validateForDeletionVectors(CoreOptions options) { !options.mergeEngine().equals(MergeEngine.FIRST_ROW), "First row merge engine does not need deletion vectors because there is no deletion of old data in this merge engine."); } + + checkArgument( + !(options.toConfiguration().get(CoreOptions.DELETION_VECTORS_MERGE_ON_READ) + && options.visibilityCallbackEnabled()), + "Cannot enable deletion-vectors.merge-on-read together with visibility-callback.enabled. " + + "The former makes DV level-0 files visible at read time, " + + "while the latter waits for compaction before returning commits."); } private static void validateSequenceField(TableSchema schema, CoreOptions options) { diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index 89796948f295..679848ac4771 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -508,4 +508,15 @@ public void testFileFormatPerLevelAcceptsCompatibleSchema() { validateTableSchema( new TableSchema(1, fields, 10, emptyList(), singletonList("k"), options, "")); } + + @Test + public void testMergeOnReadConflictWithVisibilityCallback() { + Map options = new HashMap<>(); + options.put("deletion-vectors.enabled", "true"); + options.put("deletion-vectors.merge-on-read", "true"); + options.put("visibility-callback.enabled", "true"); + assertThatThrownBy(() -> validateTableSchemaExec(options)) + .hasMessageContaining( + "Cannot enable deletion-vectors.merge-on-read together with visibility-callback.enabled"); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DeletionVectorsMergeOnReadTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DeletionVectorsMergeOnReadTest.java new file mode 100644 index 000000000000..e3b99523a25a --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/DeletionVectorsMergeOnReadTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link CoreOptions#DELETION_VECTORS_MERGE_ON_READ}. */ +public class DeletionVectorsMergeOnReadTest { + + @TempDir java.nio.file.Path tempDir; + + private Catalog catalog; + private final Identifier identifier = Identifier.create("my_db", "T"); + + @BeforeEach + public void before() throws Exception { + Options options = new Options(); + options.set("warehouse", new Path(tempDir.toString() + "/warehouse").toUri().toString()); + catalog = CatalogFactory.createCatalog(CatalogContext.create(options)); + catalog.createDatabase("my_db", true); + } + + private FileStoreTable createTable(boolean mergeOnRead) throws Exception { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("k", DataTypes.INT()); + schemaBuilder.column("v", DataTypes.INT()); + schemaBuilder.primaryKey("k"); + schemaBuilder.option("bucket", "1"); + schemaBuilder.option("deletion-vectors.enabled", "true"); + schemaBuilder.option("write-only", "true"); + if (mergeOnRead) { + schemaBuilder.option("deletion-vectors.merge-on-read", "true"); + } + catalog.createTable(identifier, schemaBuilder.build(), true); + return (FileStoreTable) catalog.getTable(identifier); + } + + private void writeCommit(FileStoreTable table, GenericRow... rows) throws Exception { + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + BatchTableWrite write = writeBuilder.newWrite(); + for (GenericRow row : rows) { + write.write(row); + } + writeBuilder.newCommit().commit(write.prepareCommit()); + write.close(); + } + + private List query(FileStoreTable table) throws Exception { + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + List result = new ArrayList<>(); + readBuilder + .newRead() + .createReader(plan) + .forEachRemaining(row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1)))); + return result; + } + + @Test + public void testDefaultSkipsLevel0() throws Exception { + FileStoreTable table = createTable(false); + + writeCommit(table, GenericRow.of(1, 10), GenericRow.of(2, 20)); + writeCommit(table, GenericRow.of(1, 11), GenericRow.of(3, 30)); + + // write-only mode, no compaction, all files at level 0 + // default DV mode skips level 0 — only the first commit's compacted data is visible + // since no compaction has run, no data should be visible from level > 0 + List result = query(table); + assertThat(result).isEmpty(); + } + + @Test + public void testMergeOnReadReadsLevel0() throws Exception { + FileStoreTable table = createTable(true); + + writeCommit(table, GenericRow.of(1, 10), GenericRow.of(2, 20)); + writeCommit(table, GenericRow.of(1, 11), GenericRow.of(3, 30)); + + // merge-on-read enabled, level 0 data is visible via MOR + List result = query(table); + assertThat(result) + .containsExactlyInAnyOrder( + GenericRow.of(1, 11), GenericRow.of(2, 20), GenericRow.of(3, 30)); + } + + @Test + public void testMergeOnReadWithQueryHint() throws Exception { + FileStoreTable table = createTable(false); + + writeCommit(table, GenericRow.of(1, 10), GenericRow.of(2, 20)); + writeCommit(table, GenericRow.of(1, 11), GenericRow.of(3, 30)); + + // default: no data visible (L0 skipped) + assertThat(query(table)).isEmpty(); + + // override with dynamic option to enable merge-on-read + table = + table.copy( + java.util.Collections.singletonMap( + "deletion-vectors.merge-on-read", "true")); + List result = query(table); + assertThat(result) + .containsExactlyInAnyOrder( + GenericRow.of(1, 11), GenericRow.of(2, 20), GenericRow.of(3, 30)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java index 9f24ee548dd2..3138bf0027ab 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java @@ -206,6 +206,38 @@ public void testBatchReadDVTable(String changelogProducer, boolean dvBitmap64) { Row.of(1, "111111111"), Row.of(2, "2_1"), Row.of(3, "3_1"), Row.of(4, "4")); } + @ParameterizedTest + @MethodSource("parameters1") + public void testBatchReadDVTableWithMergeOnRead(String changelogProducer, boolean dvBitmap64) { + sql( + String.format( + "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) " + + "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', " + + "'deletion-vectors.bitmap64' = '%s', 'write-only' = 'true')", + changelogProducer, dvBitmap64)); + + sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')"); + + sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')"); + + sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')"); + + // without merge-on-read, level 0 data is not visible (write-only, no compaction) + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='3') */")) + .containsExactlyInAnyOrder( + Row.of(1, "111111111"), Row.of(2, "2"), Row.of(3, "3"), Row.of(4, "4")); + + // with merge-on-read enabled, level 0 data becomes visible via MOR + assertThat( + batchSql( + "SELECT * FROM T /*+ OPTIONS('deletion-vectors.merge-on-read'='true') */")) + .containsExactlyInAnyOrder( + Row.of(1, "111111111"), + Row.of(2, "2_2"), + Row.of(3, "3_1"), + Row.of(4, "4_1")); + } + @ParameterizedTest @MethodSource("parameters1") public void testDVTableWithAggregationMergeEngine(String changelogProducer, boolean dvBitmap64) From e7fd9913e9f4b552c90c6e84f900f700fc3dd242 Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Mon, 25 May 2026 11:12:40 +0800 Subject: [PATCH 2/3] [core] Address review feedback on deletion-vectors.merge-on-read - Add deletionVectorsMergeOnRead() accessor in CoreOptions - Validate merge-on-read requires deletion-vectors.enabled - Simplify SchemaValidation to use accessor instead of raw config get - Align Flink IT test with core test (fixed bucket, consistent assertions) - Add testMergeOnReadRequiresDvEnabled validation test --- .../src/main/java/org/apache/paimon/CoreOptions.java | 6 +++++- .../java/org/apache/paimon/schema/SchemaValidation.java | 7 +++++-- .../org/apache/paimon/schema/SchemaValidationTest.java | 9 +++++++++ .../org/apache/paimon/flink/DeletionVectorITCase.java | 8 +++----- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index fecd9fa5c142..8ef13bef2045 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -3659,9 +3659,13 @@ public boolean forceLookup() { return options.get(FORCE_LOOKUP); } + public boolean deletionVectorsMergeOnRead() { + return options.get(DELETION_VECTORS_MERGE_ON_READ); + } + public boolean batchScanSkipLevel0() { if (deletionVectorsEnabled()) { - return !options.get(DELETION_VECTORS_MERGE_ON_READ); + return !deletionVectorsMergeOnRead(); } return mergeEngine() == FIRST_ROW; } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index b0a7a762988f..98ce18da4c21 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -281,6 +281,10 @@ public static void validateTableSchema(TableSchema schema) { if (options.deletionVectorsEnabled()) { validateForDeletionVectors(options); + } else { + checkArgument( + !options.deletionVectorsMergeOnRead(), + "deletion-vectors.merge-on-read requires deletion-vectors.enabled to be true."); } // vector field names must point to vector type @@ -622,8 +626,7 @@ private static void validateForDeletionVectors(CoreOptions options) { } checkArgument( - !(options.toConfiguration().get(CoreOptions.DELETION_VECTORS_MERGE_ON_READ) - && options.visibilityCallbackEnabled()), + !(options.deletionVectorsMergeOnRead() && options.visibilityCallbackEnabled()), "Cannot enable deletion-vectors.merge-on-read together with visibility-callback.enabled. " + "The former makes DV level-0 files visible at read time, " + "while the latter waits for compaction before returning commits."); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index 679848ac4771..c7fdf42397cb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -519,4 +519,13 @@ public void testMergeOnReadConflictWithVisibilityCallback() { .hasMessageContaining( "Cannot enable deletion-vectors.merge-on-read together with visibility-callback.enabled"); } + + @Test + public void testMergeOnReadRequiresDvEnabled() { + Map options = new HashMap<>(); + options.put("deletion-vectors.merge-on-read", "true"); + assertThatThrownBy(() -> validateTableSchemaExec(options)) + .hasMessageContaining( + "deletion-vectors.merge-on-read requires deletion-vectors.enabled to be true"); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java index 3138bf0027ab..f42f83a28eab 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java @@ -213,7 +213,7 @@ public void testBatchReadDVTableWithMergeOnRead(String changelogProducer, boolea String.format( "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) " + "WITH ('deletion-vectors.enabled' = 'true', 'changelog-producer' = '%s', " - + "'deletion-vectors.bitmap64' = '%s', 'write-only' = 'true')", + + "'deletion-vectors.bitmap64' = '%s', 'write-only' = 'true', 'bucket' = '1')", changelogProducer, dvBitmap64)); sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, '4')"); @@ -222,10 +222,8 @@ public void testBatchReadDVTableWithMergeOnRead(String changelogProducer, boolea sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')"); - // without merge-on-read, level 0 data is not visible (write-only, no compaction) - assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='3') */")) - .containsExactlyInAnyOrder( - Row.of(1, "111111111"), Row.of(2, "2"), Row.of(3, "3"), Row.of(4, "4")); + // write-only with fixed bucket, all files at level 0, not visible without merge-on-read + assertThat(batchSql("SELECT * FROM T")).isEmpty(); // with merge-on-read enabled, level 0 data becomes visible via MOR assertThat( From 832b3d31293054742058005a8280c7796d0d794b Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Mon, 25 May 2026 17:26:13 +0800 Subject: [PATCH 3/3] [core] Allow merge-on-read to coexist with visibility-callback Remove the mutual exclusion check between deletion-vectors.merge-on-read and visibility-callback.enabled. The two features are not fundamentally conflicting: merge-on-read controls read-side L0 file visibility, while visibility-callback is a commit callback that waits for compaction. They can meaningfully coexist especially in postpone bucket scenarios. --- .../paimon/schema/SchemaValidation.java | 6 ---- .../paimon/schema/SchemaValidationTest.java | 33 ++++++++++++++++--- .../spark/sql/VisibilityCallbackTest.scala | 4 ++- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 98ce18da4c21..4d84e21fc141 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -624,12 +624,6 @@ private static void validateForDeletionVectors(CoreOptions options) { !options.mergeEngine().equals(MergeEngine.FIRST_ROW), "First row merge engine does not need deletion vectors because there is no deletion of old data in this merge engine."); } - - checkArgument( - !(options.deletionVectorsMergeOnRead() && options.visibilityCallbackEnabled()), - "Cannot enable deletion-vectors.merge-on-read together with visibility-callback.enabled. " - + "The former makes DV level-0 files visible at read time, " - + "while the latter waits for compaction before returning commits."); } private static void validateSequenceField(TableSchema schema, CoreOptions options) { diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java index c7fdf42397cb..c3a79d91fdf1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java @@ -510,14 +510,39 @@ public void testFileFormatPerLevelAcceptsCompatibleSchema() { } @Test - public void testMergeOnReadConflictWithVisibilityCallback() { + public void testMergeOnReadCoexistsWithVisibilityCallback() { Map options = new HashMap<>(); options.put("deletion-vectors.enabled", "true"); options.put("deletion-vectors.merge-on-read", "true"); options.put("visibility-callback.enabled", "true"); - assertThatThrownBy(() -> validateTableSchemaExec(options)) - .hasMessageContaining( - "Cannot enable deletion-vectors.merge-on-read together with visibility-callback.enabled"); + assertThatCode(() -> validateTableSchemaExec(options)).doesNotThrowAnyException(); + } + + @Test + public void testMergeOnReadCoexistsWithVisibilityCallbackAndPostponeBucket() { + List fields = + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.INT()), + new DataField(2, "f2", DataTypes.INT()), + new DataField(3, "f3", DataTypes.STRING())); + Map options = new HashMap<>(); + options.put("deletion-vectors.enabled", "true"); + options.put("deletion-vectors.merge-on-read", "true"); + options.put("visibility-callback.enabled", "true"); + options.put(BUCKET.key(), String.valueOf(-2)); + assertThatCode( + () -> + validateTableSchema( + new TableSchema( + 1, + fields, + 10, + singletonList("f0"), + singletonList("f1"), + options, + ""))) + .doesNotThrowAnyException(); } @Test diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala index dc636e3c9991..7c49262a03a4 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VisibilityCallbackTest.scala @@ -29,17 +29,19 @@ import scala.concurrent.duration.DurationInt class VisibilityCallbackTest extends PaimonSparkTestBase { - Seq((true, false), (false, true)).foreach { + Seq((true, false), (false, true), (true, true)).foreach { case (dv, postpone) => test(s"Visibility callback with deletion-vectors $dv and postpone-bucket $postpone") { withTable("T") { val bucket = if (postpone) -2 else 1 + val mergeOnRead = if (dv) "true" else "false" sql(s""" |CREATE TABLE T (id INT, name STRING) |TBLPROPERTIES ( | 'bucket' = '$bucket', | 'primary-key' = 'id', | 'deletion-vectors.enabled' = '$dv', + | 'deletion-vectors.merge-on-read' = '$mergeOnRead', | 'write-only' = 'true' |) |""".stripMargin)