Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 30 additions & 30 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,36 @@
<td>Boolean</td>
<td>Whether to support format tables, format table corresponds to a regular csv, parquet or orc table, allowing read and write operations. However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in the metastore and need to be manually added as separate partition operations.</td>
</tr>
<tr>
<td><h5>local-cache.block-size</h5></td>
<td style="word-wrap: break-word;">1 mb</td>
<td>MemorySize</td>
<td>Block size for local cache.</td>
</tr>
<tr>
<td><h5>local-cache.dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Directory for local block cache on disk. If not configured, memory cache is used instead.</td>
</tr>
<tr>
<td><h5>local-cache.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable local block cache for file reads. If local-cache.dir is configured, disk cache is used; otherwise memory cache is used.</td>
</tr>
<tr>
<td><h5>local-cache.max-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>MemorySize</td>
<td>Maximum total size of the local block cache. Unlimited by default.</td>
</tr>
<tr>
<td><h5>local-cache.whitelist</h5></td>
<td style="word-wrap: break-word;">"meta,global-index"</td>
<td>String</td>
<td>Comma-separated list of file types to cache. Supported values: meta, global-index, bucket-index, data, file-index.</td>
</tr>
<tr>
<td><h5>lock-acquire-timeout</h5></td>
<td style="word-wrap: break-word;">8 min</td>
Expand Down Expand Up @@ -146,36 +176,6 @@
<td>Boolean</td>
<td>Sync all table properties to the catalog metastore (e.g. Hive metastore, JDBC catalog store)</td>
</tr>
<tr>
<td><h5>local-cache.block-size</h5></td>
<td style="word-wrap: break-word;">1 mb</td>
<td>MemorySize</td>
<td>Block size for local cache.</td>
</tr>
<tr>
<td><h5>local-cache.dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Directory for local block cache on disk. If not configured, memory cache is used instead.</td>
</tr>
<tr>
<td><h5>local-cache.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable local block cache for file reads. If local-cache.dir is configured, disk cache is used; otherwise memory cache is used.</td>
</tr>
<tr>
<td><h5>local-cache.max-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>MemorySize</td>
<td>Maximum total size of the local block cache. Unlimited by default.</td>
</tr>
<tr>
<td><h5>local-cache.whitelist</h5></td>
<td style="word-wrap: break-word;">"meta,global-index"</td>
<td>String</td>
<td>Comma-separated list of file types to cache. Supported values: meta, global-index, bucket-index, data, file-index.</td>
</tr>
<tr>
<td><h5>table.type</h5></td>
<td style="word-wrap: break-word;">managed</td>
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,12 @@
<td><p>Enum</p></td>
<td>Specify the scanning behavior of the source.<br /><br />Possible values:<ul><li>"default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".</li><li>"latest-full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the latest snapshot but does not read new changes.</li><li>"full": Deprecated. Same as "latest-full".</li><li>"latest": For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. For batch sources, behaves the same as the "latest-full" startup mode.</li><li>"compacted-full": For streaming sources, produces a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot after the latest compaction but does not read new changes. Snapshots of full compaction are picked when scheduled full-compaction is enabled.</li><li>"from-timestamp": For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. For batch sources, produces a snapshot at timestamp specified by "scan.timestamp-millis" but does not read new changes.</li><li>"from-creation-timestamp": For streaming sources and batch sources, If timestamp specified by "scan.creation-time-millis" is during in the range of earliest snapshot and latest snapshot: mode is from-snapshot which snapshot is equal or later the timestamp. If timestamp is earlier than earliest snapshot or later than latest snapshot, mode is from-file-creation-time.</li><li>"from-file-creation-time": For streaming and batch sources, consumes a snapshot and filters the data files by creation time. For streaming sources, upon first startup, and continue to read the latest changes.</li><li>"from-snapshot": For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. For batch sources, produces a snapshot specified by "scan.snapshot-id" or "scan.tag-name" but does not read new changes.</li><li>"from-snapshot-full": For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. For batch sources, produces a snapshot specified by "scan.snapshot-id" but does not read new changes.</li><li>"incremental": Read incremental changes between start and end snapshot or timestamp.</li></ul></td>
</tr>
<tr>
<td><h5>scan.plan-auto-tag-for-read.time-retained</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>When set, a temporary tag will be auto-created during batch scan planning to protect the read snapshot from expiration. The value specifies the tag's TTL. Should be longer than the longest expected batch read duration.</td>
</tr>
<tr>
<td><h5>scan.plan-sort-partition</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
14 changes: 14 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,15 @@ public InlineElement getDescription() {
.noDefaultValue()
.withDescription("Use customized name when creating tags in Batch mode.");

public static final ConfigOption<Duration> SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED =
key("scan.plan-auto-tag-for-read.time-retained")
.durationType()
.noDefaultValue()
.withDescription(
"When set, a temporary tag will be auto-created during batch scan planning "
+ "to protect the read snapshot from expiration. The value specifies the tag's TTL. "
+ "Should be longer than the longest expected batch read duration.");

public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT =
key("snapshot.watermark-idle-timeout")
.durationType()
Expand Down Expand Up @@ -3550,6 +3559,11 @@ public String tagBatchCustomizedName() {
return options.get(TAG_BATCH_CUSTOMIZED_NAME);
}

@Nullable
public Duration scanPlanAutoTagTimeRetained() {
return options.get(SCAN_PLAN_AUTO_TAG_FOR_READ_TIME_RETAINED);
}

public Duration snapshotWatermarkIdleTimeout() {
return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,17 @@
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
import org.apache.paimon.tag.BatchReadTagCreator;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -53,6 +59,7 @@ public class DataTableBatchScan extends AbstractDataTableScan {
private TopN topN;

private final SchemaManager schemaManager;
@Nullable private String readProtectionTagName;

public DataTableBatchScan(
TableSchema schema,
Expand Down Expand Up @@ -103,15 +110,20 @@ protected TableScan.Plan planWithoutAuth() {

if (hasNext) {
hasNext = false;
StartingScanner.Result result;
Optional<StartingScanner.Result> pushed = applyPushDownLimit();
if (pushed.isPresent()) {
return DataFilePlan.fromResult(pushed.get());
result = pushed.get();
} else {
pushed = applyPushDownTopN();
result = pushed.orElseGet(() -> startingScanner.scan(snapshotReader));
}
pushed = applyPushDownTopN();
if (pushed.isPresent()) {
return DataFilePlan.fromResult(pushed.get());

if (result instanceof ScannedResult) {
maybeCreateReadProtectionTag(((ScannedResult) result).currentSnapshotId());
}
return DataFilePlan.fromResult(startingScanner.scan(snapshotReader));

return DataFilePlan.fromResult(result);
} else {
throw new EndOfScanException();
}
Expand Down Expand Up @@ -209,4 +221,21 @@ public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubta
snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
return this;
}

@Override
@Nullable
public String readProtectionTagName() {
return readProtectionTagName;
}

private void maybeCreateReadProtectionTag(long snapshotId) {
Duration timeRetained = options().scanPlanAutoTagTimeRetained();
if (timeRetained == null) {
return;
}
SnapshotManager sm = snapshotReader.snapshotManager();
TagManager tagMgr = new TagManager(sm.fileIO(), sm.tablePath(), sm.branch());
BatchReadTagCreator creator = new BatchReadTagCreator(tagMgr, sm, timeRetained);
this.readProtectionTagName = creator.createReadTag(snapshotId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,9 @@ default InnerTableScan dropStats() {
// do nothing, should implement this if need
return this;
}

@Nullable
default String readProtectionTagName() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.tag;

import org.apache.paimon.Snapshot;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Collections;
import java.util.UUID;

/** Creates temporary tags to protect snapshots from expiration during batch reads. */
public class BatchReadTagCreator {

private static final Logger LOG = LoggerFactory.getLogger(BatchReadTagCreator.class);

public static final String BATCH_READ_TAG_PREFIX = "batch-read-";

private final TagManager tagManager;
private final SnapshotManager snapshotManager;
private final Duration timeRetained;

public BatchReadTagCreator(
TagManager tagManager, SnapshotManager snapshotManager, Duration timeRetained) {
this.tagManager = tagManager;
this.snapshotManager = snapshotManager;
this.timeRetained = timeRetained;
}

@Nullable
public String createReadTag(long snapshotId) {
Snapshot snapshot;
try {
snapshot = snapshotManager.snapshot(snapshotId);
} catch (Exception e) {
LOG.warn("Failed to get snapshot {} for read protection tag.", snapshotId, e);
return null;
}

String tagName = generateTagName(snapshotId);
try {
tagManager.createTag(snapshot, tagName, timeRetained, Collections.emptyList(), true);
LOG.info(
"Created batch read protection tag '{}' for snapshot {}.", tagName, snapshotId);
return tagName;
} catch (Exception e) {
LOG.warn(
"Failed to create batch read protection tag for snapshot {}. "
+ "Read will proceed without protection.",
snapshotId,
e);
return null;
}
}

public void deleteReadTag(String tagName) {
try {
if (tagManager.tagExists(tagName)) {
// Directly delete the tag metadata file instead of using TagManager.deleteTag(),
// which would also scan and delete unreferenced data files — too heavyweight for a
// read-path cleanup. Any orphan data files left behind will be reclaimed by
// OrphanFilesClean.
snapshotManager.fileIO().deleteQuietly(tagManager.tagPath(tagName));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentionally deleting only the tag meta file instead of calling table.deleteTag(tagName) to
keep read cleanup lightweight

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly delete the tag metadata file instead of using TagManager.deleteTag(), which would also scan and delete unreferenced data files — too heavyweight for a read-path cleanup. Any orphan data files left behind will be reclaimed by OrphanFilesClean.

Commented in 1d4a461

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

LOG.info("Deleted batch read protection tag '{}'.", tagName);
}
} catch (Exception e) {
LOG.warn(
"Failed to delete batch read protection tag '{}'. "
+ "It will be cleaned up by TTL expiration.",
tagName,
e);
}
}

public static boolean isBatchReadTag(String tagName) {
return tagName.startsWith(BATCH_READ_TAG_PREFIX);
}

private String generateTagName(long snapshotId) {
String uuid = UUID.randomUUID().toString().substring(0, 8);
return BATCH_READ_TAG_PREFIX + snapshotId + "-" + uuid;
}
}
Loading
Loading