From 67e7f934d798f91373e61e6da307c9362951089a Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 19 May 2026 16:57:50 +0800 Subject: [PATCH 1/2] [spark] Introduce plan-auto-tag-for-read for Spark read --- .../generated/catalog_configuration.html | 60 +++---- .../generated/core_configuration.html | 6 + .../java/org/apache/paimon/CoreOptions.java | 14 ++ .../table/source/DataTableBatchScan.java | 39 ++++- .../paimon/table/source/InnerTableScan.java | 5 + .../paimon/tag/BatchReadTagCreator.java | 101 +++++++++++ .../paimon/tag/BatchReadTagCreatorTest.java | 139 +++++++++++++++ .../apache/paimon/spark/PaimonBaseScan.scala | 24 ++- .../read/BatchReadTagCleanupListener.scala | 93 ++++++++++ .../BatchReadTagConcurrentExpireTest.scala | 163 ++++++++++++++++++ .../read/BatchReadTagProtectionTest.scala | 118 +++++++++++++ 11 files changed, 719 insertions(+), 43 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala create mode 100644 paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala create mode 100644 paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index 6cafa6ccdfa2..d885fee7053d 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -104,6 +104,36 @@ Boolean Whether to support format tables, format table corresponds to a regular csv, parquet or orc table, allowing read and write operations. However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in the metastore and need to be manually added as separate partition operations. + +
local-cache.block-size
+ 1 mb + MemorySize + Block size for local cache. + + +
local-cache.dir
+ (none) + String + Directory for local block cache on disk. If not configured, memory cache is used instead. + + +
local-cache.enabled
+ false + Boolean + Whether to enable local block cache for file reads. If local-cache.dir is configured, disk cache is used; otherwise memory cache is used. + + +
local-cache.max-size
+ (none) + MemorySize + Maximum total size of the local block cache. Unlimited by default. + + +
local-cache.whitelist
+ "meta,global-index" + String + Comma-separated list of file types to cache. Supported values: meta, global-index, bucket-index, data, file-index. +
lock-acquire-timeout
8 min @@ -146,36 +176,6 @@ Boolean Sync all table properties to the catalog metastore (e.g. Hive metastore, JDBC catalog store) - -
local-cache.block-size
- 1 mb - MemorySize - Block size for local cache. - - -
local-cache.dir
- (none) - String - Directory for local block cache on disk. If not configured, memory cache is used instead. - - -
local-cache.enabled
- false - Boolean - Whether to enable local block cache for file reads. If local-cache.dir is configured, disk cache is used; otherwise memory cache is used. - - -
local-cache.max-size
- (none) - MemorySize - Maximum total size of the local block cache. Unlimited by default. - - -
local-cache.whitelist
- "meta,global-index" - String - Comma-separated list of file types to cache. Supported values: meta, global-index, bucket-index, data, file-index. -
table.type
managed diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 7496cbfc15c8..0b829129832d 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1259,6 +1259,12 @@

Enum

Specify the scanning behavior of the source.

Possible values: + +
scan.plan-auto-tag-for-read.time-retained
+ (none) + Duration + When set, a temporary tag will be auto-created during batch scan planning to protect the read snapshot from expiration. The value specifies the tag's TTL. Should be longer than the longest expected batch read duration. +
scan.plan-sort-partition
false diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 176d1e9d4d47..d5a1bb7fb067 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1767,6 +1767,15 @@ public InlineElement getDescription() { .noDefaultValue() .withDescription("Use customized name when creating tags in Batch mode."); + public static final ConfigOption SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED = + key("scan.plan-auto-tag-for-read.time-retained") + .durationType() + .noDefaultValue() + .withDescription( + "When set, a temporary tag will be auto-created during batch scan planning " + + "to protect the read snapshot from expiration. The value specifies the tag's TTL. " + + "Should be longer than the longest expected batch read duration."); + public static final ConfigOption SNAPSHOT_WATERMARK_IDLE_TIMEOUT = key("snapshot.watermark-idle-timeout") .durationType() @@ -3550,6 +3559,11 @@ public String tagBatchCustomizedName() { return options.get(TAG_BATCH_CUSTOMIZED_NAME); } + @Nullable + public Duration scanPlanAutoTagTimeRetained() { + return options.get(SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED); + } + public Duration snapshotWatermarkIdleTimeout() { return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java index 255ff5f672fb..710f73698506 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java @@ -29,11 +29,17 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.table.source.snapshot.StartingScanner; import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult; +import org.apache.paimon.tag.BatchReadTagCreator; import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -53,6 +59,7 @@ public class DataTableBatchScan extends AbstractDataTableScan { private TopN topN; private final SchemaManager schemaManager; + @Nullable private String readProtectionTagName; public DataTableBatchScan( TableSchema schema, @@ -103,15 +110,20 @@ protected TableScan.Plan planWithoutAuth() { if (hasNext) { hasNext = false; + StartingScanner.Result result; Optional pushed = applyPushDownLimit(); if (pushed.isPresent()) { - return DataFilePlan.fromResult(pushed.get()); + result = pushed.get(); + } else { + pushed = applyPushDownTopN(); + result = pushed.orElseGet(() -> startingScanner.scan(snapshotReader)); } - pushed = applyPushDownTopN(); - if (pushed.isPresent()) { - return DataFilePlan.fromResult(pushed.get()); + + if (result instanceof ScannedResult) { + maybeCreateReadProtectionTag(((ScannedResult) result).currentSnapshotId()); } - return DataFilePlan.fromResult(startingScanner.scan(snapshotReader)); + + return DataFilePlan.fromResult(result); } else { throw new EndOfScanException(); } @@ -209,4 +221,21 @@ public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubta snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks); return this; } + + @Override + @Nullable + public String readProtectionTagName() { + return readProtectionTagName; + } + + private void maybeCreateReadProtectionTag(long snapshotId) { + Duration timeRetained = options().scanPlanAutoTagTimeRetained(); + if (timeRetained == null) { + return; + } + SnapshotManager sm = snapshotReader.snapshotManager(); + TagManager tagMgr = new TagManager(sm.fileIO(), sm.tablePath(), sm.branch()); + BatchReadTagCreator creator = new BatchReadTagCreator(tagMgr, sm, timeRetained); + this.readProtectionTagName = creator.createReadTag(snapshotId); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index c09809feb949..38278d4620be 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -106,4 +106,9 @@ default InnerTableScan dropStats() { // do nothing, should implement this if need return this; } + + @Nullable + default String readProtectionTagName() { + return null; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java b/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java new file mode 100644 index 000000000000..e0c708761ecf --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Collections; +import java.util.UUID; + +/** Creates temporary tags to protect snapshots from expiration during batch reads. */ +public class BatchReadTagCreator { + + private static final Logger LOG = LoggerFactory.getLogger(BatchReadTagCreator.class); + + public static final String BATCH_READ_TAG_PREFIX = "batch-read-"; + + private final TagManager tagManager; + private final SnapshotManager snapshotManager; + private final Duration timeRetained; + + public BatchReadTagCreator( + TagManager tagManager, SnapshotManager snapshotManager, Duration timeRetained) { + this.tagManager = tagManager; + this.snapshotManager = snapshotManager; + this.timeRetained = timeRetained; + } + + @Nullable + public String createReadTag(long snapshotId) { + Snapshot snapshot; + try { + snapshot = snapshotManager.snapshot(snapshotId); + } catch (Exception e) { + LOG.warn("Failed to get snapshot {} for read protection tag.", snapshotId, e); + return null; + } + + String tagName = generateTagName(snapshotId); + try { + tagManager.createTag(snapshot, tagName, timeRetained, Collections.emptyList(), true); + LOG.info( + "Created batch read protection tag '{}' for snapshot {}.", tagName, snapshotId); + return tagName; + } catch (Exception e) { + LOG.warn( + "Failed to create batch read protection tag for snapshot {}. " + + "Read will proceed without protection.", + snapshotId, + e); + return null; + } + } + + public void deleteReadTag(String tagName) { + try { + if (tagManager.tagExists(tagName)) { + snapshotManager.fileIO().deleteQuietly(tagManager.tagPath(tagName)); + LOG.info("Deleted batch read protection tag '{}'.", tagName); + } + } catch (Exception e) { + LOG.warn( + "Failed to delete batch read protection tag '{}'. " + + "It will be cleaned up by TTL expiration.", + tagName, + e); + } + } + + public static boolean isBatchReadTag(String tagName) { + return tagName.startsWith(BATCH_READ_TAG_PREFIX); + } + + private String generateTagName(long snapshotId) { + String uuid = UUID.randomUUID().toString().substring(0, 8); + return BATCH_READ_TAG_PREFIX + snapshotId + "-" + uuid; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java new file mode 100644 index 000000000000..4f225e2f07ea --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.PrimaryKeyTableTestBase; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataTableBatchScan; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link BatchReadTagCreator}. */ +public class BatchReadTagCreatorTest extends PrimaryKeyTableTestBase { + + @Override + protected Options tableOptions() { + Options options = new Options(); + options.set(CoreOptions.BUCKET, 1); + return options; + } + + @Test + public void testCreateAndDeleteReadTag() throws Exception { + writeCommit(GenericRow.of(1, 1, 1)); + + SnapshotManager sm = table.snapshotManager(); + TagManager tagManager = table.tagManager(); + long snapshotId = sm.latestSnapshotId(); + + BatchReadTagCreator creator = new BatchReadTagCreator(tagManager, sm, Duration.ofHours(1)); + + String tagName = creator.createReadTag(snapshotId); + assertThat(tagName).isNotNull(); + assertThat(tagName).startsWith(BatchReadTagCreator.BATCH_READ_TAG_PREFIX); + assertThat(tagManager.tagExists(tagName)).isTrue(); + + creator.deleteReadTag(tagName); + assertThat(tagManager.tagExists(tagName)).isFalse(); + } + + @Test + public void testIsBatchReadTag() { + assertThat(BatchReadTagCreator.isBatchReadTag("batch-read-1-abc12345")).isTrue(); + assertThat(BatchReadTagCreator.isBatchReadTag("batch-read-42-xyz")).isTrue(); + assertThat(BatchReadTagCreator.isBatchReadTag("my-tag")).isFalse(); + assertThat(BatchReadTagCreator.isBatchReadTag("2023-07-18 11")).isFalse(); + } + + @Test + public void testCreateTagFailsGracefully() { + SnapshotManager sm = table.snapshotManager(); + TagManager tagManager = table.tagManager(); + + BatchReadTagCreator creator = new BatchReadTagCreator(tagManager, sm, Duration.ofHours(1)); + + // snapshot 999 does not exist + String tagName = creator.createReadTag(999L); + assertThat(tagName).isNull(); + } + + @Test + public void testDeleteNonExistentTagIsNoOp() throws Exception { + writeCommit(GenericRow.of(1, 1, 1)); + + SnapshotManager sm = table.snapshotManager(); + TagManager tagManager = table.tagManager(); + + BatchReadTagCreator creator = new BatchReadTagCreator(tagManager, sm, Duration.ofHours(1)); + + // should not throw + creator.deleteReadTag("batch-read-nonexistent-12345678"); + } + + @Test + public void testScanCreatesProtectionTag() throws Exception { + writeCommit(GenericRow.of(1, 1, 1)); + + Options options = new Options(); + options.set(CoreOptions.SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED, Duration.ofHours(2)); + FileStoreTable tableWithOption = table.copy(options.toMap()); + + InnerTableScan scan = tableWithOption.newScan(); + TableScan.Plan plan = scan.plan(); + + assertThat(plan.splits()).isNotEmpty(); + assertThat(scan).isInstanceOf(DataTableBatchScan.class); + + DataTableBatchScan batchScan = (DataTableBatchScan) scan; + String tagName = batchScan.readProtectionTagName(); + assertThat(tagName).isNotNull(); + assertThat(tagName).startsWith(BatchReadTagCreator.BATCH_READ_TAG_PREFIX); + + TagManager tagManager = table.tagManager(); + assertThat(tagManager.tagExists(tagName)).isTrue(); + + // verify tag has TTL set + Tag tag = tagManager.getOrThrow(tagName); + assertThat(tag.getTagTimeRetained()).isEqualTo(Duration.ofHours(2)); + } + + @Test + public void testScanDoesNotCreateTagWhenDisabled() throws Exception { + writeCommit(GenericRow.of(1, 1, 1)); + + // default: option not set + InnerTableScan scan = table.newScan(); + scan.plan(); + + assertThat(scan).isInstanceOf(DataTableBatchScan.class); + DataTableBatchScan batchScan = (DataTableBatchScan) scan; + assertThat(batchScan.readProtectionTagName()).isNull(); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala index 30a162192529..ff1e0bde3775 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala @@ -20,14 +20,15 @@ package org.apache.paimon.spark import org.apache.paimon.globalindex.GlobalIndexResult import org.apache.paimon.partition.PartitionPredicate -import org.apache.paimon.predicate.{Predicate, PredicateBuilder} +import org.apache.paimon.predicate.PredicateBuilder import org.apache.paimon.spark.metric.SparkMetricRegistry -import org.apache.paimon.spark.read.{BaseScan, PaimonSupportsRuntimeFiltering} +import org.apache.paimon.spark.read.{BaseScan, BatchReadTagCleanupListener, PaimonSupportsRuntimeFiltering} import org.apache.paimon.spark.sources.PaimonMicroBatchStream import org.apache.paimon.spark.util.OptionUtils import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable} -import org.apache.paimon.table.source.{InnerTableScan, Split} +import org.apache.paimon.table.source.{DataTableBatchScan, InnerTableScan, Split} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric} import org.apache.spark.sql.connector.read.Batch @@ -43,15 +44,22 @@ abstract class PaimonBaseScan(table: InnerTable) private lazy val paimonMetricsRegistry: SparkMetricRegistry = SparkMetricRegistry() protected def getInputSplits: Array[Split] = { - readBuilder + val scan = readBuilder .newScan() .withGlobalIndexResult(evalGlobalIndexSearch()) .asInstanceOf[InnerTableScan] .withMetricRegistry(paimonMetricsRegistry) - .plan() - .splits() - .asScala - .toArray + + val plan = scan.plan() + + Option(scan.readProtectionTagName).foreach { + name => + BatchReadTagCleanupListener + .getOrCreate(SparkSession.active) + .registerCleanup(name, table) + } + + plan.splits().asScala.toArray } private def evalGlobalIndexSearch(): GlobalIndexResult = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala new file mode 100644 index 000000000000..988748c109ec --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.read + +import org.apache.paimon.table.{DataTable, Table} +import org.apache.paimon.tag.BatchReadTagCreator + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +import java.util.concurrent.ConcurrentHashMap + +/** + * A Spark [[QueryExecutionListener]] that cleans up batch-read protection tags when a query + * completes (success or failure). TTL expiration serves as a safety net if this cleanup fails. + */ +class BatchReadTagCleanupListener extends QueryExecutionListener with Logging { + + private val pendingCleanups = new ConcurrentHashMap[String, DataTable]() + + def registerCleanup(tagName: String, table: Table): Unit = { + table match { + case dt: DataTable => pendingCleanups.put(tagName, dt) + case _ => + } + } + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + cleanupAll() + } + + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + cleanupAll() + } + + private def cleanupAll(): Unit = { + val iter = pendingCleanups.entrySet().iterator() + while (iter.hasNext) { + val entry = iter.next() + val tagName = entry.getKey + val dataTable = entry.getValue + iter.remove() + try { + val creator = new BatchReadTagCreator( + dataTable.tagManager(), + dataTable.snapshotManager(), + dataTable.coreOptions().scanPlanAutoTagTimeRetained()) + creator.deleteReadTag(tagName) + } catch { + case e: Exception => + logWarning( + s"Failed to delete batch read protection tag '$tagName'. " + + "It will be cleaned up by TTL expiration.", + e) + } + } + } +} + +object BatchReadTagCleanupListener { + + @volatile private var instance: BatchReadTagCleanupListener = _ + + def getOrCreate(spark: SparkSession): BatchReadTagCleanupListener = { + if (instance == null) { + synchronized { + if (instance == null) { + instance = new BatchReadTagCleanupListener() + spark.listenerManager.register(instance) + } + } + } + instance + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala new file mode 100644 index 000000000000..45d1118d8517 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.read + +import org.apache.paimon.options.ExpireConfig +import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.table.source.DataTableBatchScan + +import java.time.Duration +import java.util + +import scala.collection.JavaConverters._ + +/** + * Tests that simulate concurrent snapshot expiration during a batch read. + * + * Uses an append-only table with INSERT OVERWRITE to ensure old data files become unreferenced by + * later snapshots. Without protection: data files are deleted during expiration, read fails. With + * protection (auto-tag): data files are preserved, read succeeds. + */ +class BatchReadTagConcurrentExpireTest extends PaimonSparkTestBase { + + test("Paimon: without protection, read fails after concurrent expiration") { + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |""".stripMargin) + + // write snapshot 1 + spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')") + + val table = loadTable("T") + + // Step 1: plan the scan (get splits referencing snapshot 1 data files) + val scan = table.newScan() + val plan = scan.plan() + val splits = plan.splits() + assert(!splits.isEmpty) + + // Step 2: OVERWRITE makes snapshot 1's files unreferenced by the new snapshot + spark.sql("INSERT OVERWRITE T VALUES (3, 'v3'), (4, 'v4')") + spark.sql("INSERT INTO T VALUES (5, 'v5')") + spark.sql("INSERT INTO T VALUES (6, 'v6')") + + // Step 3: aggressively expire snapshots (keep only 1) + val reloadedTable = loadTable("T") + reloadedTable + .newExpireSnapshots() + .config( + ExpireConfig + .builder() + .snapshotMaxDeletes(Integer.MAX_VALUE) + .snapshotRetainMax(1) + .snapshotRetainMin(1) + .snapshotTimeRetain(Duration.ZERO) + .build()) + .expire() + + // Step 4: try to read from the old splits - data files have been deleted + val read = table.newReadBuilder().newRead() + var readFailed = false + try { + val readers = splits.asScala.map(split => read.createReader(split)) + readers.foreach { + reader => + val iter = reader.toCloseableIterator + while (iter.hasNext) iter.next() + iter.close() + } + } catch { + case _: Exception => + readFailed = true + } + + assert(readFailed, "Read should fail because data files were deleted by expiration") + } + + test("Paimon: with protection, read succeeds after concurrent expiration") { + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ( + | 'scan.plan-auto-tag-for-read.time-retained' = '1 h' + |) + |""".stripMargin) + + // write snapshot 1 + spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')") + + val table = loadTable("T") + + // Step 1: plan the scan - this creates a protection tag automatically + val scan = table.newScan() + val plan = scan.plan() + val splits = plan.splits() + assert(!splits.isEmpty) + + // Verify protection tag was created + val batchScan = scan.asInstanceOf[DataTableBatchScan] + val tagName = batchScan.readProtectionTagName + assert(tagName != null, "Protection tag should be created during scan planning") + assert(table.tagManager().tagExists(tagName)) + + // Step 2: OVERWRITE makes snapshot 1's files unreferenced by new snapshot + spark.sql("INSERT OVERWRITE T VALUES (3, 'v3'), (4, 'v4')") + spark.sql("INSERT INTO T VALUES (5, 'v5')") + spark.sql("INSERT INTO T VALUES (6, 'v6')") + + // Step 3: aggressively expire snapshots (keep only 1) + val reloadedTable = loadTable("T") + reloadedTable + .newExpireSnapshots() + .config( + ExpireConfig + .builder() + .snapshotMaxDeletes(Integer.MAX_VALUE) + .snapshotRetainMax(1) + .snapshotRetainMin(1) + .snapshotTimeRetain(Duration.ZERO) + .build()) + .expire() + + // Step 4: read from old splits - should succeed because tag protects data files + val read = table.newReadBuilder().newRead() + val results = new util.ArrayList[String]() + splits.asScala.foreach { + split => + val reader = read.createReader(split) + val iter = reader.toCloseableIterator + while (iter.hasNext) { + val row = iter.next() + results.add(s"${row.getInt(0)},${row.getString(1).toString}") + } + iter.close() + } + + assert(results.size() == 2, s"Should read 2 rows but got ${results.size()}") + assert(results.contains("1,v1")) + assert(results.contains("2,v2")) + + // Cleanup: delete the protection tag + val snapshotManager = table.snapshotManager() + val creator = new org.apache.paimon.tag.BatchReadTagCreator( + table.tagManager(), + snapshotManager, + Duration.ofHours(1)) + creator.deleteReadTag(tagName) + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala new file mode 100644 index 000000000000..afced7afeddb --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.read + +import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.tag.BatchReadTagCreator + +import org.apache.spark.sql.Row + +class BatchReadTagProtectionTest extends PaimonSparkTestBase { + + test("Paimon: batch read creates protection tag when enabled") { + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ( + | 'primary-key' = 'a', + | 'bucket' = '1', + | 'scan.plan-auto-tag-for-read.time-retained' = '1 h' + |) + |""".stripMargin) + + spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')") + + val table = loadTable("T") + val tagManager = table.tagManager() + + // query triggers scan which creates a protection tag + checkAnswer(spark.sql("SELECT * FROM T ORDER BY a"), Row(1, "v1") :: Row(2, "v2") :: Nil) + + // after query completes, listener should have cleaned up the tag + // give it a moment since listener fires asynchronously + Thread.sleep(500) + + val remainingTags = tagManager.allTagNames() + val batchReadTags = remainingTags.toArray + .map(_.toString) + .filter(BatchReadTagCreator.isBatchReadTag) + assert( + batchReadTags.isEmpty, + s"Protection tags should be cleaned up, but found: ${batchReadTags.mkString(", ")}") + } + + test("Paimon: batch read does NOT create tag when disabled") { + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key' = 'a', 'bucket' = '1') + |""".stripMargin) + + spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')") + + val table = loadTable("T") + val tagManager = table.tagManager() + + checkAnswer(spark.sql("SELECT * FROM T ORDER BY a"), Row(1, "v1") :: Row(2, "v2") :: Nil) + + val remainingTags = tagManager.allTagNames() + val batchReadTags = remainingTags.toArray + .map(_.toString) + .filter(BatchReadTagCreator.isBatchReadTag) + assert(batchReadTags.isEmpty, "No protection tags should be created when feature is disabled") + } + + test("Paimon: protection tag prevents data file deletion during expiration") { + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ( + | 'primary-key' = 'a', + | 'bucket' = '1', + | 'scan.plan-auto-tag-for-read.time-retained' = '1 h', + | 'snapshot.num-retained.min' = '1', + | 'snapshot.num-retained.max' = '1' + |) + |""".stripMargin) + + // create snapshot 1 + spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')") + + val table = loadTable("T") + val tagManager = table.tagManager() + val snapshotManager = table.snapshotManager() + + // manually create a protection tag on snapshot 1 (simulating what scan does) + val creator = + new BatchReadTagCreator(tagManager, snapshotManager, java.time.Duration.ofHours(1)) + val tagName = creator.createReadTag(1L) + assert(tagName != null) + assert(tagManager.tagExists(tagName)) + + // create more snapshots to trigger expiration of snapshot 1 + spark.sql("INSERT INTO T VALUES (3, 'v3')") + spark.sql("INSERT INTO T VALUES (4, 'v4')") + + // snapshot 1 may be expired now, but data files should be protected by the tag + // read from the tag should still work + checkAnswer( + spark.sql(s"SELECT * FROM T VERSION AS OF '$tagName' ORDER BY a"), + Row(1, "v1") :: Row(2, "v2") :: Nil) + + // cleanup + creator.deleteReadTag(tagName) + } +} From 1d4a46123ea8a4e7835d4a801a90524eb96011ed Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 19 May 2026 19:59:48 +0800 Subject: [PATCH 2/2] add comments --- .../main/java/org/apache/paimon/tag/BatchReadTagCreator.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java b/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java index e0c708761ecf..8637709b31c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java @@ -78,6 +78,10 @@ public String createReadTag(long snapshotId) { public void deleteReadTag(String tagName) { try { if (tagManager.tagExists(tagName)) { + // Directly delete the tag metadata file instead of using TagManager.deleteTag(), + // which would also scan and delete unreferenced data files — too heavyweight for a + // read-path cleanup. Any orphan data files left behind will be reclaimed by + // OrphanFilesClean. snapshotManager.fileIO().deleteQuietly(tagManager.tagPath(tagName)); LOG.info("Deleted batch read protection tag '{}'.", tagName); }