diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 6cafa6ccdfa2..d885fee7053d 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -104,6 +104,36 @@
Boolean |
Whether to support format tables, format table corresponds to a regular csv, parquet or orc table, allowing read and write operations. However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in the metastore and need to be manually added as separate partition operations. |
+
+ local-cache.block-size |
+ 1 mb |
+ MemorySize |
+ Block size for local cache. |
+
+
+ local-cache.dir |
+ (none) |
+ String |
+ Directory for local block cache on disk. If not configured, memory cache is used instead. |
+
+
+ local-cache.enabled |
+ false |
+ Boolean |
+ Whether to enable local block cache for file reads. If local-cache.dir is configured, disk cache is used; otherwise memory cache is used. |
+
+
+ local-cache.max-size |
+ (none) |
+ MemorySize |
+ Maximum total size of the local block cache. Unlimited by default. |
+
+
+ local-cache.whitelist |
+ "meta,global-index" |
+ String |
+ Comma-separated list of file types to cache. Supported values: meta, global-index, bucket-index, data, file-index. |
+
lock-acquire-timeout |
8 min |
@@ -146,36 +176,6 @@
Boolean |
Sync all table properties to the catalog metastore (e.g. Hive metastore, JDBC catalog store) |
-
- local-cache.block-size |
- 1 mb |
- MemorySize |
- Block size for local cache. |
-
-
- local-cache.dir |
- (none) |
- String |
- Directory for local block cache on disk. If not configured, memory cache is used instead. |
-
-
- local-cache.enabled |
- false |
- Boolean |
- Whether to enable local block cache for file reads. If local-cache.dir is configured, disk cache is used; otherwise memory cache is used. |
-
-
- local-cache.max-size |
- (none) |
- MemorySize |
- Maximum total size of the local block cache. Unlimited by default. |
-
-
- local-cache.whitelist |
- "meta,global-index" |
- String |
- Comma-separated list of file types to cache. Supported values: meta, global-index, bucket-index, data, file-index. |
-
table.type |
managed |
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 7496cbfc15c8..0b829129832d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1259,6 +1259,12 @@
Enum |
Specify the scanning behavior of the source.
Possible values:- "default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".
- "latest-full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the latest snapshot but does not read new changes.
- "full": Deprecated. Same as "latest-full".
- "latest": For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. For batch sources, behaves the same as the "latest-full" startup mode.
- "compacted-full": For streaming sources, produces a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot after the latest compaction but does not read new changes. Snapshots of full compaction are picked when scheduled full-compaction is enabled.
- "from-timestamp": For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. For batch sources, produces a snapshot at timestamp specified by "scan.timestamp-millis" but does not read new changes.
- "from-creation-timestamp": For streaming sources and batch sources, If timestamp specified by "scan.creation-time-millis" is during in the range of earliest snapshot and latest snapshot: mode is from-snapshot which snapshot is equal or later the timestamp. If timestamp is earlier than earliest snapshot or later than latest snapshot, mode is from-file-creation-time.
- "from-file-creation-time": For streaming and batch sources, consumes a snapshot and filters the data files by creation time. For streaming sources, upon first startup, and continue to read the latest changes.
- "from-snapshot": For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. For batch sources, produces a snapshot specified by "scan.snapshot-id" or "scan.tag-name" but does not read new changes.
- "from-snapshot-full": For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. For batch sources, produces a snapshot specified by "scan.snapshot-id" but does not read new changes.
- "incremental": Read incremental changes between start and end snapshot or timestamp.
|
+
+ scan.plan-auto-tag-for-read.time-retained |
+ (none) |
+ Duration |
+ When set, a temporary tag will be auto-created during batch scan planning to protect the read snapshot from expiration. The value specifies the tag's TTL. Should be longer than the longest expected batch read duration. |
+
scan.plan-sort-partition |
false |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 176d1e9d4d47..d5a1bb7fb067 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1767,6 +1767,15 @@ public InlineElement getDescription() {
.noDefaultValue()
.withDescription("Use customized name when creating tags in Batch mode.");
+ public static final ConfigOption SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED =
+ key("scan.plan-auto-tag-for-read.time-retained")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "When set, a temporary tag will be auto-created during batch scan planning "
+ + "to protect the read snapshot from expiration. The value specifies the tag's TTL. "
+ + "Should be longer than the longest expected batch read duration.");
+
public static final ConfigOption SNAPSHOT_WATERMARK_IDLE_TIMEOUT =
key("snapshot.watermark-idle-timeout")
.durationType()
@@ -3550,6 +3559,11 @@ public String tagBatchCustomizedName() {
return options.get(TAG_BATCH_CUSTOMIZED_NAME);
}
+ @Nullable
+ public Duration scanPlanAutoTagTimeRetained() {
+ return options.get(SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED);
+ }
+
public Duration snapshotWatermarkIdleTimeout() {
return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 255ff5f672fb..710f73698506 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -29,11 +29,17 @@
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
+import org.apache.paimon.tag.BatchReadTagCreator;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -53,6 +59,7 @@ public class DataTableBatchScan extends AbstractDataTableScan {
private TopN topN;
private final SchemaManager schemaManager;
+ @Nullable private String readProtectionTagName;
public DataTableBatchScan(
TableSchema schema,
@@ -103,15 +110,20 @@ protected TableScan.Plan planWithoutAuth() {
if (hasNext) {
hasNext = false;
+ StartingScanner.Result result;
Optional pushed = applyPushDownLimit();
if (pushed.isPresent()) {
- return DataFilePlan.fromResult(pushed.get());
+ result = pushed.get();
+ } else {
+ pushed = applyPushDownTopN();
+ result = pushed.orElseGet(() -> startingScanner.scan(snapshotReader));
}
- pushed = applyPushDownTopN();
- if (pushed.isPresent()) {
- return DataFilePlan.fromResult(pushed.get());
+
+ if (result instanceof ScannedResult) {
+ maybeCreateReadProtectionTag(((ScannedResult) result).currentSnapshotId());
}
- return DataFilePlan.fromResult(startingScanner.scan(snapshotReader));
+
+ return DataFilePlan.fromResult(result);
} else {
throw new EndOfScanException();
}
@@ -209,4 +221,21 @@ public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubta
snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
return this;
}
+
+ @Override
+ @Nullable
+ public String readProtectionTagName() {
+ return readProtectionTagName;
+ }
+
+ private void maybeCreateReadProtectionTag(long snapshotId) {
+ Duration timeRetained = options().scanPlanAutoTagTimeRetained();
+ if (timeRetained == null) {
+ return;
+ }
+ SnapshotManager sm = snapshotReader.snapshotManager();
+ TagManager tagMgr = new TagManager(sm.fileIO(), sm.tablePath(), sm.branch());
+ BatchReadTagCreator creator = new BatchReadTagCreator(tagMgr, sm, timeRetained);
+ this.readProtectionTagName = creator.createReadTag(snapshotId);
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index c09809feb949..38278d4620be 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -106,4 +106,9 @@ default InnerTableScan dropStats() {
// do nothing, should implement this if need
return this;
}
+
+ @Nullable
+ default String readProtectionTagName() {
+ return null;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java b/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java
new file mode 100644
index 000000000000..8637709b31c4
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/BatchReadTagCreator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+
+/** Creates temporary tags to protect snapshots from expiration during batch reads. */
+public class BatchReadTagCreator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BatchReadTagCreator.class);
+
+ public static final String BATCH_READ_TAG_PREFIX = "batch-read-";
+
+ private final TagManager tagManager;
+ private final SnapshotManager snapshotManager;
+ private final Duration timeRetained;
+
+ public BatchReadTagCreator(
+ TagManager tagManager, SnapshotManager snapshotManager, Duration timeRetained) {
+ this.tagManager = tagManager;
+ this.snapshotManager = snapshotManager;
+ this.timeRetained = timeRetained;
+ }
+
+ @Nullable
+ public String createReadTag(long snapshotId) {
+ Snapshot snapshot;
+ try {
+ snapshot = snapshotManager.snapshot(snapshotId);
+ } catch (Exception e) {
+ LOG.warn("Failed to get snapshot {} for read protection tag.", snapshotId, e);
+ return null;
+ }
+
+ String tagName = generateTagName(snapshotId);
+ try {
+ tagManager.createTag(snapshot, tagName, timeRetained, Collections.emptyList(), true);
+ LOG.info(
+ "Created batch read protection tag '{}' for snapshot {}.", tagName, snapshotId);
+ return tagName;
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to create batch read protection tag for snapshot {}. "
+ + "Read will proceed without protection.",
+ snapshotId,
+ e);
+ return null;
+ }
+ }
+
+ public void deleteReadTag(String tagName) {
+ try {
+ if (tagManager.tagExists(tagName)) {
+ // Directly delete the tag metadata file instead of using TagManager.deleteTag(),
+ // which would also scan and delete unreferenced data files — too heavyweight for a
+ // read-path cleanup. Any orphan data files left behind will be reclaimed by
+ // OrphanFilesClean.
+ snapshotManager.fileIO().deleteQuietly(tagManager.tagPath(tagName));
+ LOG.info("Deleted batch read protection tag '{}'.", tagName);
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to delete batch read protection tag '{}'. "
+ + "It will be cleaned up by TTL expiration.",
+ tagName,
+ e);
+ }
+ }
+
+ public static boolean isBatchReadTag(String tagName) {
+ return tagName.startsWith(BATCH_READ_TAG_PREFIX);
+ }
+
+ private String generateTagName(long snapshotId) {
+ String uuid = UUID.randomUUID().toString().substring(0, 8);
+ return BATCH_READ_TAG_PREFIX + snapshotId + "-" + uuid;
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java
new file mode 100644
index 000000000000..4f225e2f07ea
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/tag/BatchReadTagCreatorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.tag;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BatchReadTagCreator}. */
+public class BatchReadTagCreatorTest extends PrimaryKeyTableTestBase {
+
+ @Override
+ protected Options tableOptions() {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ return options;
+ }
+
+ @Test
+ public void testCreateAndDeleteReadTag() throws Exception {
+ writeCommit(GenericRow.of(1, 1, 1));
+
+ SnapshotManager sm = table.snapshotManager();
+ TagManager tagManager = table.tagManager();
+ long snapshotId = sm.latestSnapshotId();
+
+ BatchReadTagCreator creator = new BatchReadTagCreator(tagManager, sm, Duration.ofHours(1));
+
+ String tagName = creator.createReadTag(snapshotId);
+ assertThat(tagName).isNotNull();
+ assertThat(tagName).startsWith(BatchReadTagCreator.BATCH_READ_TAG_PREFIX);
+ assertThat(tagManager.tagExists(tagName)).isTrue();
+
+ creator.deleteReadTag(tagName);
+ assertThat(tagManager.tagExists(tagName)).isFalse();
+ }
+
+ @Test
+ public void testIsBatchReadTag() {
+ assertThat(BatchReadTagCreator.isBatchReadTag("batch-read-1-abc12345")).isTrue();
+ assertThat(BatchReadTagCreator.isBatchReadTag("batch-read-42-xyz")).isTrue();
+ assertThat(BatchReadTagCreator.isBatchReadTag("my-tag")).isFalse();
+ assertThat(BatchReadTagCreator.isBatchReadTag("2023-07-18 11")).isFalse();
+ }
+
+ @Test
+ public void testCreateTagFailsGracefully() {
+ SnapshotManager sm = table.snapshotManager();
+ TagManager tagManager = table.tagManager();
+
+ BatchReadTagCreator creator = new BatchReadTagCreator(tagManager, sm, Duration.ofHours(1));
+
+ // snapshot 999 does not exist
+ String tagName = creator.createReadTag(999L);
+ assertThat(tagName).isNull();
+ }
+
+ @Test
+ public void testDeleteNonExistentTagIsNoOp() throws Exception {
+ writeCommit(GenericRow.of(1, 1, 1));
+
+ SnapshotManager sm = table.snapshotManager();
+ TagManager tagManager = table.tagManager();
+
+ BatchReadTagCreator creator = new BatchReadTagCreator(tagManager, sm, Duration.ofHours(1));
+
+ // should not throw
+ creator.deleteReadTag("batch-read-nonexistent-12345678");
+ }
+
+ @Test
+ public void testScanCreatesProtectionTag() throws Exception {
+ writeCommit(GenericRow.of(1, 1, 1));
+
+ Options options = new Options();
+ options.set(CoreOptions.SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED, Duration.ofHours(2));
+ FileStoreTable tableWithOption = table.copy(options.toMap());
+
+ InnerTableScan scan = tableWithOption.newScan();
+ TableScan.Plan plan = scan.plan();
+
+ assertThat(plan.splits()).isNotEmpty();
+ assertThat(scan).isInstanceOf(DataTableBatchScan.class);
+
+ DataTableBatchScan batchScan = (DataTableBatchScan) scan;
+ String tagName = batchScan.readProtectionTagName();
+ assertThat(tagName).isNotNull();
+ assertThat(tagName).startsWith(BatchReadTagCreator.BATCH_READ_TAG_PREFIX);
+
+ TagManager tagManager = table.tagManager();
+ assertThat(tagManager.tagExists(tagName)).isTrue();
+
+ // verify tag has TTL set
+ Tag tag = tagManager.getOrThrow(tagName);
+ assertThat(tag.getTagTimeRetained()).isEqualTo(Duration.ofHours(2));
+ }
+
+ @Test
+ public void testScanDoesNotCreateTagWhenDisabled() throws Exception {
+ writeCommit(GenericRow.of(1, 1, 1));
+
+ // default: option not set
+ InnerTableScan scan = table.newScan();
+ scan.plan();
+
+ assertThat(scan).isInstanceOf(DataTableBatchScan.class);
+ DataTableBatchScan batchScan = (DataTableBatchScan) scan;
+ assertThat(batchScan.readProtectionTagName()).isNull();
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 30a162192529..ff1e0bde3775 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -20,14 +20,15 @@ package org.apache.paimon.spark
import org.apache.paimon.globalindex.GlobalIndexResult
import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.predicate.PredicateBuilder
import org.apache.paimon.spark.metric.SparkMetricRegistry
-import org.apache.paimon.spark.read.{BaseScan, PaimonSupportsRuntimeFiltering}
+import org.apache.paimon.spark.read.{BaseScan, BatchReadTagCleanupListener, PaimonSupportsRuntimeFiltering}
import org.apache.paimon.spark.sources.PaimonMicroBatchStream
import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable}
-import org.apache.paimon.table.source.{InnerTableScan, Split}
+import org.apache.paimon.table.source.{DataTableBatchScan, InnerTableScan, Split}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read.Batch
@@ -43,15 +44,22 @@ abstract class PaimonBaseScan(table: InnerTable)
private lazy val paimonMetricsRegistry: SparkMetricRegistry = SparkMetricRegistry()
protected def getInputSplits: Array[Split] = {
- readBuilder
+ val scan = readBuilder
.newScan()
.withGlobalIndexResult(evalGlobalIndexSearch())
.asInstanceOf[InnerTableScan]
.withMetricRegistry(paimonMetricsRegistry)
- .plan()
- .splits()
- .asScala
- .toArray
+
+ val plan = scan.plan()
+
+ Option(scan.readProtectionTagName).foreach {
+ name =>
+ BatchReadTagCleanupListener
+ .getOrCreate(SparkSession.active)
+ .registerCleanup(name, table)
+ }
+
+ plan.splits().asScala.toArray
}
private def evalGlobalIndexSearch(): GlobalIndexResult = {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala
new file mode 100644
index 000000000000..988748c109ec
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BatchReadTagCleanupListener.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.table.{DataTable, Table}
+import org.apache.paimon.tag.BatchReadTagCreator
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * A Spark [[QueryExecutionListener]] that cleans up batch-read protection tags when a query
+ * completes (success or failure). TTL expiration serves as a safety net if this cleanup fails.
+ */
+class BatchReadTagCleanupListener extends QueryExecutionListener with Logging {
+
+ private val pendingCleanups = new ConcurrentHashMap[String, DataTable]()
+
+ def registerCleanup(tagName: String, table: Table): Unit = {
+ table match {
+ case dt: DataTable => pendingCleanups.put(tagName, dt)
+ case _ =>
+ }
+ }
+
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
+ cleanupAll()
+ }
+
+ override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
+ cleanupAll()
+ }
+
+ private def cleanupAll(): Unit = {
+ val iter = pendingCleanups.entrySet().iterator()
+ while (iter.hasNext) {
+ val entry = iter.next()
+ val tagName = entry.getKey
+ val dataTable = entry.getValue
+ iter.remove()
+ try {
+ val creator = new BatchReadTagCreator(
+ dataTable.tagManager(),
+ dataTable.snapshotManager(),
+ dataTable.coreOptions().scanPlanAutoTagTimeRetained())
+ creator.deleteReadTag(tagName)
+ } catch {
+ case e: Exception =>
+ logWarning(
+ s"Failed to delete batch read protection tag '$tagName'. " +
+ "It will be cleaned up by TTL expiration.",
+ e)
+ }
+ }
+ }
+}
+
+object BatchReadTagCleanupListener {
+
+ @volatile private var instance: BatchReadTagCleanupListener = _
+
+ def getOrCreate(spark: SparkSession): BatchReadTagCleanupListener = {
+ if (instance == null) {
+ synchronized {
+ if (instance == null) {
+ instance = new BatchReadTagCleanupListener()
+ spark.listenerManager.register(instance)
+ }
+ }
+ }
+ instance
+ }
+}
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala
new file mode 100644
index 000000000000..45d1118d8517
--- /dev/null
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagConcurrentExpireTest.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.options.ExpireConfig
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.source.DataTableBatchScan
+
+import java.time.Duration
+import java.util
+
+import scala.collection.JavaConverters._
+
+/**
+ * Tests that simulate concurrent snapshot expiration during a batch read.
+ *
+ * Uses an append-only table with INSERT OVERWRITE to ensure old data files become unreferenced by
+ * later snapshots. Without protection: data files are deleted during expiration, read fails. With
+ * protection (auto-tag): data files are preserved, read succeeds.
+ */
+class BatchReadTagConcurrentExpireTest extends PaimonSparkTestBase {
+
+ test("Paimon: without protection, read fails after concurrent expiration") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |""".stripMargin)
+
+ // write snapshot 1
+ spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+ val table = loadTable("T")
+
+ // Step 1: plan the scan (get splits referencing snapshot 1 data files)
+ val scan = table.newScan()
+ val plan = scan.plan()
+ val splits = plan.splits()
+ assert(!splits.isEmpty)
+
+ // Step 2: OVERWRITE makes snapshot 1's files unreferenced by the new snapshot
+ spark.sql("INSERT OVERWRITE T VALUES (3, 'v3'), (4, 'v4')")
+ spark.sql("INSERT INTO T VALUES (5, 'v5')")
+ spark.sql("INSERT INTO T VALUES (6, 'v6')")
+
+ // Step 3: aggressively expire snapshots (keep only 1)
+ val reloadedTable = loadTable("T")
+ reloadedTable
+ .newExpireSnapshots()
+ .config(
+ ExpireConfig
+ .builder()
+ .snapshotMaxDeletes(Integer.MAX_VALUE)
+ .snapshotRetainMax(1)
+ .snapshotRetainMin(1)
+ .snapshotTimeRetain(Duration.ZERO)
+ .build())
+ .expire()
+
+ // Step 4: try to read from the old splits - data files have been deleted
+ val read = table.newReadBuilder().newRead()
+ var readFailed = false
+ try {
+ val readers = splits.asScala.map(split => read.createReader(split))
+ readers.foreach {
+ reader =>
+ val iter = reader.toCloseableIterator
+ while (iter.hasNext) iter.next()
+ iter.close()
+ }
+ } catch {
+ case _: Exception =>
+ readFailed = true
+ }
+
+ assert(readFailed, "Read should fail because data files were deleted by expiration")
+ }
+
+ test("Paimon: with protection, read succeeds after concurrent expiration") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES (
+ | 'scan.plan-auto-tag-for-read.time-retained' = '1 h'
+ |)
+ |""".stripMargin)
+
+ // write snapshot 1
+ spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+ val table = loadTable("T")
+
+ // Step 1: plan the scan - this creates a protection tag automatically
+ val scan = table.newScan()
+ val plan = scan.plan()
+ val splits = plan.splits()
+ assert(!splits.isEmpty)
+
+ // Verify protection tag was created
+ val batchScan = scan.asInstanceOf[DataTableBatchScan]
+ val tagName = batchScan.readProtectionTagName
+ assert(tagName != null, "Protection tag should be created during scan planning")
+ assert(table.tagManager().tagExists(tagName))
+
+ // Step 2: OVERWRITE makes snapshot 1's files unreferenced by new snapshot
+ spark.sql("INSERT OVERWRITE T VALUES (3, 'v3'), (4, 'v4')")
+ spark.sql("INSERT INTO T VALUES (5, 'v5')")
+ spark.sql("INSERT INTO T VALUES (6, 'v6')")
+
+ // Step 3: aggressively expire snapshots (keep only 1)
+ val reloadedTable = loadTable("T")
+ reloadedTable
+ .newExpireSnapshots()
+ .config(
+ ExpireConfig
+ .builder()
+ .snapshotMaxDeletes(Integer.MAX_VALUE)
+ .snapshotRetainMax(1)
+ .snapshotRetainMin(1)
+ .snapshotTimeRetain(Duration.ZERO)
+ .build())
+ .expire()
+
+ // Step 4: read from old splits - should succeed because tag protects data files
+ val read = table.newReadBuilder().newRead()
+ val results = new util.ArrayList[String]()
+ splits.asScala.foreach {
+ split =>
+ val reader = read.createReader(split)
+ val iter = reader.toCloseableIterator
+ while (iter.hasNext) {
+ val row = iter.next()
+ results.add(s"${row.getInt(0)},${row.getString(1).toString}")
+ }
+ iter.close()
+ }
+
+ assert(results.size() == 2, s"Should read 2 rows but got ${results.size()}")
+ assert(results.contains("1,v1"))
+ assert(results.contains("2,v2"))
+
+ // Cleanup: delete the protection tag
+ val snapshotManager = table.snapshotManager()
+ val creator = new org.apache.paimon.tag.BatchReadTagCreator(
+ table.tagManager(),
+ snapshotManager,
+ Duration.ofHours(1))
+ creator.deleteReadTag(tagName)
+ }
+}
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala
new file mode 100644
index 000000000000..afced7afeddb
--- /dev/null
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/read/BatchReadTagProtectionTest.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.tag.BatchReadTagCreator
+
+import org.apache.spark.sql.Row
+
+class BatchReadTagProtectionTest extends PaimonSparkTestBase {
+
+ test("Paimon: batch read creates protection tag when enabled") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'a',
+ | 'bucket' = '1',
+ | 'scan.plan-auto-tag-for-read.time-retained' = '1 h'
+ |)
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+ val table = loadTable("T")
+ val tagManager = table.tagManager()
+
+ // query triggers scan which creates a protection tag
+ checkAnswer(spark.sql("SELECT * FROM T ORDER BY a"), Row(1, "v1") :: Row(2, "v2") :: Nil)
+
+ // after query completes, listener should have cleaned up the tag
+ // give it a moment since listener fires asynchronously
+ Thread.sleep(500)
+
+ val remainingTags = tagManager.allTagNames()
+ val batchReadTags = remainingTags.toArray
+ .map(_.toString)
+ .filter(BatchReadTagCreator.isBatchReadTag)
+ assert(
+ batchReadTags.isEmpty,
+ s"Protection tags should be cleaned up, but found: ${batchReadTags.mkString(", ")}")
+ }
+
+ test("Paimon: batch read does NOT create tag when disabled") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key' = 'a', 'bucket' = '1')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+ val table = loadTable("T")
+ val tagManager = table.tagManager()
+
+ checkAnswer(spark.sql("SELECT * FROM T ORDER BY a"), Row(1, "v1") :: Row(2, "v2") :: Nil)
+
+ val remainingTags = tagManager.allTagNames()
+ val batchReadTags = remainingTags.toArray
+ .map(_.toString)
+ .filter(BatchReadTagCreator.isBatchReadTag)
+ assert(batchReadTags.isEmpty, "No protection tags should be created when feature is disabled")
+ }
+
+ test("Paimon: protection tag prevents data file deletion during expiration") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'a',
+ | 'bucket' = '1',
+ | 'scan.plan-auto-tag-for-read.time-retained' = '1 h',
+ | 'snapshot.num-retained.min' = '1',
+ | 'snapshot.num-retained.max' = '1'
+ |)
+ |""".stripMargin)
+
+ // create snapshot 1
+ spark.sql("INSERT INTO T VALUES (1, 'v1'), (2, 'v2')")
+
+ val table = loadTable("T")
+ val tagManager = table.tagManager()
+ val snapshotManager = table.snapshotManager()
+
+ // manually create a protection tag on snapshot 1 (simulating what scan does)
+ val creator =
+ new BatchReadTagCreator(tagManager, snapshotManager, java.time.Duration.ofHours(1))
+ val tagName = creator.createReadTag(1L)
+ assert(tagName != null)
+ assert(tagManager.tagExists(tagName))
+
+ // create more snapshots to trigger expiration of snapshot 1
+ spark.sql("INSERT INTO T VALUES (3, 'v3')")
+ spark.sql("INSERT INTO T VALUES (4, 'v4')")
+
+ // snapshot 1 may be expired now, but data files should be protected by the tag
+ // read from the tag should still work
+ checkAnswer(
+ spark.sql(s"SELECT * FROM T VERSION AS OF '$tagName' ORDER BY a"),
+ Row(1, "v1") :: Row(2, "v2") :: Nil)
+
+ // cleanup
+ creator.deleteReadTag(tagName)
+ }
+}