diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/RetentionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/RetentionUtils.java
new file mode 100644
index 000000000000..d004b04c65a0
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/RetentionUtils.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility methods for evaluating segment retention eligibility.
+ */
+public class RetentionUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RetentionUtils.class);
+
+ private RetentionUtils() {
+ }
+
+ /**
+ * Implements the time-comparison and creation-time fallback logic used by {@code TimeRetentionStrategy} for
+ * completed segments. Does NOT check segment completion status — callers must guarantee that only completed
+ * segments (DONE or UPLOADED) are passed in.
+ *
+ * - If end time is valid: expired when {@code currentTimeMs - endTimeMs > retentionMs}.
+ * - If end time is invalid and {@code useCreationTimeFallback} is true and creation time is valid:
+ * expired when {@code currentTimeMs - creationTimeMs > retentionMs}.
+ * - Otherwise: not expired (fail-open).
+ *
+ *
+ * @param tableNameWithType table name with type suffix, used for logging
+ * @param segmentZKMetadata segment metadata
+ * @param retentionMs retention period in milliseconds (must be positive)
+ * @param currentTimeMs current wall-clock time in milliseconds
+ * @param useCreationTimeFallback when true, fall back to creation time if end time is invalid
+ * (must match {@code controller.retentionManager.enableCreationTimeFallback})
+ * @return true if the segment is past the retention boundary, false otherwise
+ */
+ public static boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata, long retentionMs,
+ long currentTimeMs, boolean useCreationTimeFallback) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ long endTimeMs = segmentZKMetadata.getEndTimeMs();
+ if (TimeUtils.timeValueInValidRange(endTimeMs)) {
+ return currentTimeMs - endTimeMs > retentionMs;
+ }
+ if (useCreationTimeFallback) {
+ long creationTimeMs = segmentZKMetadata.getCreationTime();
+ if (TimeUtils.timeValueInValidRange(creationTimeMs)) {
+ LOGGER.debug("Segment: {} of table: {} has invalid end time: {}. Using creation time: {} as fallback",
+ segmentName, tableNameWithType, endTimeMs, creationTimeMs);
+ return currentTimeMs - creationTimeMs > retentionMs;
+ }
+ LOGGER.warn("Segment: {} of table: {} has invalid end time: {} and invalid creation time: {}. "
+ + "Cannot determine retention, skipping", segmentName, tableNameWithType, endTimeMs, creationTimeMs);
+ } else {
+ LOGGER.warn("Segment: {} of table: {} has invalid end time in millis: {}. "
+ + "Creation time fallback is disabled", segmentName, tableNameWithType, endTimeMs);
+ }
+ return false;
+ }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/RetentionUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/RetentionUtilsTest.java
new file mode 100644
index 000000000000..08ce605f5ffd
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/RetentionUtilsTest.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class RetentionUtilsTest {
+
+ private static final String TABLE_NAME = "testTable_REALTIME";
+ private static final long ONE_DAY_MS = TimeUnit.DAYS.toMillis(1);
+ private static final long RETENTION_MS = TimeUnit.DAYS.toMillis(7);
+
+ private static SegmentZKMetadata makeSegment(long endTimeMs) {
+ SegmentZKMetadata segment = new SegmentZKMetadata("seg");
+ segment.setEndTime(endTimeMs);
+ segment.setTimeUnit(TimeUnit.MILLISECONDS);
+ return segment;
+ }
+
+ private static SegmentZKMetadata makeSegmentWithCreationTime(long endTimeMs, long creationTimeMs) {
+ SegmentZKMetadata segment = new SegmentZKMetadata("seg");
+ segment.setEndTime(endTimeMs);
+ segment.setTimeUnit(TimeUnit.MILLISECONDS);
+ segment.setCreationTime(creationTimeMs);
+ return segment;
+ }
+
+ @Test
+ public void testExpiredSegmentIsPurgeable() {
+ long now = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegment(now - 10 * ONE_DAY_MS);
+ assertTrue(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
+ }
+
+ @Test
+ public void testRecentSegmentIsNotPurgeable() {
+ long now = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegment(now - 2 * ONE_DAY_MS);
+ assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
+ }
+
+ @Test
+ public void testExactBoundaryIsNotPurgeable() {
+ // strict greater-than: segment at exactly retentionMs old should NOT be purgeable
+ long now = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegment(now - RETENTION_MS);
+ assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
+ }
+
+ @Test
+ public void testOneMsPastBoundaryIsPurgeable() {
+ long now = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegment(now - RETENTION_MS - 1);
+ assertTrue(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
+ }
+
+ @Test
+ public void testInvalidEndTimeIsNotPurgeable() {
+ long now = System.currentTimeMillis();
+ SegmentZKMetadata segment = new SegmentZKMetadata("seg");
+ segment.setEndTime(-1);
+ segment.setTimeUnit(TimeUnit.MILLISECONDS);
+ assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
+ }
+
+ @Test
+ public void testFarFutureEndTimeIsNotPurgeable() {
+ long now = System.currentTimeMillis();
+ long farFuture = now + TimeUnit.DAYS.toMillis(365 * 200L);
+ SegmentZKMetadata segment = makeSegment(farFuture);
+ assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
+ }
+
+ @Test
+ public void testInvalidEndTimeFallbackDisabledIsNotPurgeable() {
+ long now = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegmentWithCreationTime(-1, now - 10 * ONE_DAY_MS);
+ assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
+ }
+
+ @Test
+ public void testInvalidEndTimeOldCreationTimeFallbackEnabledIsPurgeable() {
+ long now = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegmentWithCreationTime(-1, now - 10 * ONE_DAY_MS);
+ assertTrue(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, true));
+ }
+
+ @Test
+ public void testInvalidEndTimeRecentCreationTimeFallbackEnabledIsNotPurgeable() {
+ long now = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegmentWithCreationTime(-1, now - 2 * ONE_DAY_MS);
+ assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, true));
+ }
+
+ @Test
+ public void testInvalidEndTimeInvalidCreationTimeFallbackEnabledIsNotPurgeable() {
+ long now = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegmentWithCreationTime(-1, -1);
+ assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, true));
+ }
+
+ @Test
+ public void testValidEndTimeTakesPriorityOverCreationTimeFallback() {
+ long now = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegmentWithCreationTime(now - 2 * ONE_DAY_MS, now - 10 * ONE_DAY_MS);
+ assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, true));
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
index f9f0b31b3581..14f37c7235a9 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
@@ -20,6 +20,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.RetentionUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,30 +53,8 @@ public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZK
return false; // Incomplete segments don't have final end time and should not be purged
}
- String segmentName = segmentZKMetadata.getSegmentName();
- long endTimeMs = segmentZKMetadata.getEndTimeMs();
-
- // If end time is valid, use it directly
- if (TimeUtils.timeValueInValidRange(endTimeMs)) {
- return System.currentTimeMillis() - endTimeMs > _retentionMs;
- }
-
- long creationTimeMs = segmentZKMetadata.getCreationTime();
-
- if (_useCreationTimeFallback && TimeUtils.timeValueInValidRange(creationTimeMs)) {
- LOGGER.debug("Segment: {} of table: {} has invalid end time: {}. Using creation time: {} as fallback",
- segmentName, tableNameWithType, endTimeMs, creationTimeMs);
- return System.currentTimeMillis() - creationTimeMs > _retentionMs;
- }
-
- if (_useCreationTimeFallback) {
- LOGGER.warn("Segment: {} of table: {} has invalid end time: {} and invalid creation time: {}. "
- + "Cannot determine retention, skipping", segmentName, tableNameWithType, endTimeMs, creationTimeMs);
- } else {
- LOGGER.warn("Segment: {} of table: {} has invalid end time in millis: {}. "
- + "Creation time fallback is disabled", segmentName, tableNameWithType, endTimeMs);
- }
- return false;
+ return RetentionUtils.isPurgeable(tableNameWithType, segmentZKMetadata, _retentionMs,
+ System.currentTimeMillis(), _useCreationTimeFallback);
}
@Override
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
index 396a91fc4774..b96b9a569908 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixAdmin;
@@ -35,16 +36,20 @@
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.auth.AuthProviderUtils;
import org.apache.pinot.common.auth.NullAuthProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
+import org.apache.pinot.common.utils.RetentionUtils;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.config.InstanceUtils;
+import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -57,6 +62,7 @@
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.Enablement;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
@@ -85,9 +91,33 @@ static MinionConstants.ValidDocIdsConsensusMode parseValidDocIdsConsensusMode(St
*/
public static final String ALLOW_METADATA_PUSH_WITH_LOCAL_FS = "allowMetadataPushWithLocalFs";
+ /**
+ * Task config key for an optional safety margin subtracted from retention when filtering segments.
+ * Value is a period string (e.g. "1h", "30m"). Segments within {@code (now - endTime) > (retention - buffer)}
+ * are excluded from task generation. This is a table-level config — not per-merge-level — because retention
+ * itself is table-level.
+ *
+ * Currently used by {@code MergeRollupTask} and {@code UpsertCompactMergeTask}. Not applied to
+ * {@code UpsertCompactionTask} (legacy single-segment compaction, being superseded by UpsertCompactMergeTask).
+ */
+ public static final String RETENTION_EXPIRY_BUFFER_PERIOD_KEY = "retentionExpiryBufferPeriod";
+
private MinionTaskUtils() {
}
+ /**
+ * Reads the creation-time fallback flag from Helix cluster config. This is the same config that
+ * {@code RetentionManager} reacts to via {@code onChange()}, so the filter stays aligned with what
+ * RetentionManager will actually delete.
+ */
+ public static boolean isCreationTimeFallbackEnabled(ClusterInfoAccessor clusterInfoAccessor) {
+ String raw = clusterInfoAccessor.getClusterConfig(
+ ControllerConf.ControllerPeriodicTasksConf.ENABLE_RETENTION_CREATION_TIME_FALLBACK);
+ return raw != null
+ ? Boolean.parseBoolean(raw)
+ : ControllerConf.ControllerPeriodicTasksConf.DEFAULT_ENABLE_RETENTION_CREATION_TIME_FALLBACK;
+ }
+
/**
* Resolves the AuthProvider to use for Minion tasks.
* Priority order:
@@ -451,4 +481,112 @@ public static ValidDocIdsType getValidDocIdsType(UpsertConfig upsertConfig, Map<
}
return validDocIdsType;
}
+
+ /**
+ * Filters out segments that are past (or near) the table's retention period. This prevents task generators from
+ * selecting segments that RetentionManager may delete before the task executor downloads them.
+ *
+ * Uses the same retention logic as {@code TimeRetentionStrategy}: a segment is considered expired if
+ * {@code currentTimeMs - endTimeMs > effectiveRetentionMs}, where effectiveRetentionMs is
+ * {@code retentionMs - bufferMs}.
+ *
+ * If {@link #RETENTION_EXPIRY_BUFFER_PERIOD_KEY} is set in {@code taskConfigs}, the effective retention is reduced
+ * by that amount, excluding segments earlier — before RetentionManager actually deletes them. This is a table-level
+ * config, not a per-merge-level config, because retention itself is table-level.
+ *
+ * Note on hybrid tables: This method reads only the table-level retention config
+ * ({@code segmentsConfig.retentionTimeUnit/Value}). It does not account for hybrid retention strategies that use
+ * the offline table's time boundary. If hybrid retention is enabled (off by default), RetentionManager may use a
+ * different deletion boundary than what this method computes, so the filter may not perfectly match the controller's
+ * deletion decisions for hybrid tables.
+ *
+ * Watermark impact (MergeRollupTask): This filter runs before watermark advancement. If all segments in an
+ * early time bucket are filtered out, the watermark will advance past them permanently. This is a one-way door but
+ * is expected: those segments would be purged by RetentionManager regardless. If this is caused by a misconfigured
+ * {@code retentionExpiryBufferPeriod}, correcting the config will not recover already-skipped buckets.
+ *
+ * @apiNote Callers are expected to pass only completed segments (status DONE or UPLOADED). This method does not
+ * check segment status, unlike {@code TimeRetentionStrategy.isPurgeable()} which skips incomplete segments. All
+ * current callers ({@code UpsertCompactMergeTaskGenerator.getCandidateSegments},
+ * {@code MergeRollupTaskGenerator.getNonConsumingSegmentsZKMetadataForRealtimeTable}) already guarantee this.
+ *
+ * @param segments the candidate segments to filter (must not be null)
+ * @param tableConfig the table config containing retention settings
+ * @param taskConfigs task-level configs; may contain {@link #RETENTION_EXPIRY_BUFFER_PERIOD_KEY}.
+ * Null if unavailable.
+ * @param currentTimeMs the current time in milliseconds (pass {@code System.currentTimeMillis()})
+ * @param useCreationTimeFallback when true, segments with invalid end times are evaluated against their
+ * creation time; must match
+ * {@code controller.retentionManager.enableCreationTimeFallback} so this
+ * filter stays aligned with what RetentionManager will actually delete
+ * @return filtered list excluding segments past effective retention; returns the original list if retention is not
+ * configured or cannot be parsed
+ */
+ public static List filterSegmentsPastRetention(List segments,
+ TableConfig tableConfig, @Nullable Map taskConfigs, long currentTimeMs,
+ boolean useCreationTimeFallback) {
+ SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+ String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
+ String retentionTimeValue = validationConfig.getRetentionTimeValue();
+ if (retentionTimeUnit == null || retentionTimeValue == null) {
+ return segments;
+ }
+
+ long retentionMs;
+ try {
+ retentionMs = TimeUnit.valueOf(retentionTimeUnit.toUpperCase()).toMillis(Long.parseLong(retentionTimeValue));
+ } catch (Exception e) {
+ LOGGER.warn("Failed to parse retention config for table: {}, skipping retention filter",
+ tableConfig.getTableName(), e);
+ return segments;
+ }
+
+ if (retentionMs <= 0) {
+ LOGGER.warn("Retention is non-positive ({}ms) for table: {}, skipping retention filter",
+ retentionMs, tableConfig.getTableName());
+ return segments;
+ }
+
+ long bufferMs = 0;
+ if (taskConfigs != null) {
+ String bufferPeriod = taskConfigs.get(RETENTION_EXPIRY_BUFFER_PERIOD_KEY);
+ if (bufferPeriod != null) {
+ try {
+ bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
+ if (bufferMs < 0) {
+ LOGGER.warn("Invalid retentionExpiryBufferPeriod '{}' for table: {}, using 0",
+ bufferPeriod, tableConfig.getTableName());
+ bufferMs = 0;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to parse retentionExpiryBufferPeriod '{}' for table: {}, using 0",
+ bufferPeriod, tableConfig.getTableName(), e);
+ }
+ }
+ }
+
+ long effectiveRetentionMs = retentionMs - bufferMs;
+ if (effectiveRetentionMs <= 0) {
+ LOGGER.warn("retentionExpiryBufferPeriod ({}) >= retention ({}ms) for table: {}, skipping retention filter",
+ bufferMs, retentionMs, tableConfig.getTableName());
+ return segments;
+ }
+
+ String tableNameWithType = tableConfig.getTableName();
+ List filtered = new ArrayList<>();
+ int excludedCount = 0;
+ for (SegmentZKMetadata segment : segments) {
+ if (RetentionUtils.isPurgeable(tableNameWithType, segment, effectiveRetentionMs, currentTimeMs,
+ useCreationTimeFallback)) {
+ excludedCount++;
+ } else {
+ filtered.add(segment);
+ }
+ }
+ if (excludedCount > 0) {
+ LOGGER.info("Excluded {} segments past retention for table: {} (retentionMs={}, bufferMs={}, effectiveMs={})",
+ excludedCount, tableNameWithType, retentionMs, bufferMs, effectiveRetentionMs);
+ }
+ return filtered;
+ }
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 7208d5144d5f..fb6dd7c428f6 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -154,6 +154,7 @@ public String getTaskType() {
public List generateTasks(List tableConfigs) {
String taskType = MergeRollupTask.TASK_TYPE;
List pinotTaskConfigs = new ArrayList<>();
+ boolean useCreationTimeFallback = MinionTaskUtils.isCreationTimeFallbackEnabled(_clusterInfoAccessor);
for (TableConfig tableConfig : tableConfigs) {
if (!validate(tableConfig, taskType)) {
continue;
@@ -176,6 +177,8 @@ public List generateTasks(List tableConfigs) {
}
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(preSelectedSegmentsBasedOnLineage, segmentLineage);
+ Map taskConfigs = tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
+
List preSelectedSegments = new ArrayList<>();
for (SegmentZKMetadata segment : allSegments) {
if (preSelectedSegmentsBasedOnLineage.contains(segment.getSegmentName()) && segment.getTotalDocs() > 0
@@ -183,6 +186,13 @@ public List generateTasks(List tableConfigs) {
preSelectedSegments.add(segment);
}
}
+ // Filter out segments past retention to avoid selecting segments that RetentionManager may delete before the
+ // task executor downloads them. Note: if early time buckets consist entirely of expired segments, the watermark
+ // will advance past them since they won't appear in preSelectedSegments. This is expected because those segments
+ // will be purged by RetentionManager regardless.
+ long currentTimeMs = System.currentTimeMillis();
+ preSelectedSegments = MinionTaskUtils.filterSegmentsPastRetention(preSelectedSegments, tableConfig, taskConfigs,
+ currentTimeMs, useCreationTimeFallback);
if (preSelectedSegments.isEmpty()) {
// Reset the watermark time if no segment found. This covers the case where the table is newly created or
@@ -206,7 +216,6 @@ public List generateTasks(List tableConfigs) {
});
// Sort merge levels based on bucket time period
- Map taskConfigs = tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
Map> mergeLevelToConfigs = MergeRollupTaskUtils.getLevelToConfigMap(taskConfigs);
List>> sortedMergeLevelConfigs =
new ArrayList<>(mergeLevelToConfigs.entrySet());
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
index 3d72a7c6b8c5..1b9565aa24c8 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -44,6 +44,7 @@
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -124,6 +125,7 @@ public String getTaskType() {
public List generateTasks(List tableConfigs) {
String taskType = MinionConstants.UpsertCompactMergeTask.TASK_TYPE;
List pinotTaskConfigs = new ArrayList<>();
+ boolean useCreationTimeFallback = MinionTaskUtils.isCreationTimeFallbackEnabled(_clusterInfoAccessor);
for (TableConfig tableConfig : tableConfigs) {
String tableNameWithType = tableConfig.getTableName();
@@ -147,8 +149,14 @@ public List generateTasks(List tableConfigs) {
List allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
// Get completed segments and filter out the segments based on the buffer time configuration
+ long currentTimeMs = System.currentTimeMillis();
List candidateSegments =
- getCandidateSegments(taskConfigs, allSegments, System.currentTimeMillis());
+ getCandidateSegments(taskConfigs, allSegments, currentTimeMs);
+ // Complements the bufferTimePeriod filter above: getCandidateSegments ensures freshness (don't compact
+ // segments still being written), while this filter excludes segments nearing deletion by RetentionManager.
+ candidateSegments =
+ MinionTaskUtils.filterSegmentsPastRetention(candidateSegments, tableConfig, taskConfigs, currentTimeMs,
+ useCreationTimeFallback);
if (candidateSegments.isEmpty()) {
LOGGER.info("No segments were eligible for compactMerge task for table: {}", tableNameWithType);
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
index 709118ba3411..0643189f6d9e 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtilsTest.java
@@ -19,19 +19,23 @@
package org.apache.pinot.plugin.minion.tasks;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.common.utils.ServiceStatus;
+import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.core.common.MinionConstants;
@@ -650,4 +654,393 @@ public void testGetPushTaskConfigMETADATAPushModeWithLocalOutputDir() {
assertEquals(pushTaskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI), "http://localhost:9000");
assertEquals(pushTaskConfigs.size(), 4);
}
+
+ private static SegmentZKMetadata makeSegmentWithEndTimeMs(String name, long endTimeMs) {
+ SegmentZKMetadata segment = new SegmentZKMetadata(name);
+ segment.setEndTime(endTimeMs);
+ segment.setTimeUnit(TimeUnit.MILLISECONDS);
+ return segment;
+ }
+
+ private static SegmentZKMetadata makeSegmentWithCreationTime(String name, long endTimeMs, long creationTimeMs) {
+ SegmentZKMetadata segment = new SegmentZKMetadata(name);
+ segment.setEndTime(endTimeMs);
+ segment.setTimeUnit(TimeUnit.MILLISECONDS);
+ segment.setCreationTime(creationTimeMs);
+ return segment;
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionExcludesExpiredSegments() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("7")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ long oneDayMs = 86_400_000L;
+
+ SegmentZKMetadata recentSegment = makeSegmentWithEndTimeMs("segment_recent", nowMs - 2 * oneDayMs);
+ SegmentZKMetadata expiredSegment = makeSegmentWithEndTimeMs("segment_expired", nowMs - 10 * oneDayMs);
+
+ List segments = new ArrayList<>(List.of(recentSegment, expiredSegment));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, null, nowMs, false);
+
+ assertEquals(filtered.size(), 1);
+ assertEquals(filtered.get(0).getSegmentName(), "segment_recent");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionReturnsAllWhenNoRetention() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+
+ SegmentZKMetadata segment1 = makeSegmentWithEndTimeMs("segment_1", nowMs - 365L * 86_400_000L);
+ SegmentZKMetadata segment2 = makeSegmentWithEndTimeMs("segment_2", nowMs - 86_400_000L);
+
+ List segments = new ArrayList<>(List.of(segment1, segment2));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, null, nowMs, false);
+
+ assertEquals(filtered.size(), 2);
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionKeepsSegmentsWithInvalidEndTime() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("7")
+ .build();
+
+ SegmentZKMetadata invalidTimeSegment = new SegmentZKMetadata("segment_invalid_time");
+ invalidTimeSegment.setEndTime(-1);
+ invalidTimeSegment.setTimeUnit(TimeUnit.MILLISECONDS);
+
+ List segments = new ArrayList<>(List.of(invalidTimeSegment));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, null, System.currentTimeMillis(), false);
+
+ assertEquals(filtered.size(), 1);
+ assertEquals(filtered.get(0).getSegmentName(), "segment_invalid_time");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionMalformedRetentionConfigReturnsAll() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("INVALID_UNIT")
+ .setRetentionTimeValue("7")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegmentWithEndTimeMs("segment_1", nowMs - 365L * 86_400_000L);
+
+ List segments = new ArrayList<>(List.of(segment));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, null, nowMs, false);
+
+ assertEquals(filtered.size(), 1, "Malformed unit should fall through to catch block and return original list");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionMalformedRetentionValueReturnsAll() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("abc")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegmentWithEndTimeMs("segment_1", nowMs - 365L * 86_400_000L);
+
+ List segments = new ArrayList<>(List.of(segment));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, null, nowMs, false);
+
+ assertEquals(filtered.size(), 1, "Malformed value should fall through to catch block and return original list");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionZeroRetentionReturnsAll() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("0")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegmentWithEndTimeMs("segment_1", nowMs - 365L * 86_400_000L);
+
+ List segments = new ArrayList<>(List.of(segment));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, null, nowMs, false);
+
+ assertEquals(filtered.size(), 1, "Zero retention should return original list unchanged");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionNegativeRetentionReturnsAll() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("-1")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegmentWithEndTimeMs("segment_1", nowMs - 365L * 86_400_000L);
+
+ List segments = new ArrayList<>(List.of(segment));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, null, nowMs, false);
+
+ assertEquals(filtered.size(), 1, "Negative retention should return original list unchanged");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionExactBoundaryIsKept() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("7")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ long sevenDaysMs = 7L * 86_400_000L;
+
+ // Exact boundary: endTime is exactly 7 days ago, so (now - endTime) == retentionMs. The filter uses
+ // strict greater-than, so this segment should be kept.
+ SegmentZKMetadata boundarySegment = makeSegmentWithEndTimeMs("segment_boundary", nowMs - sevenDaysMs);
+ // 1ms past boundary: (now - endTime) > retentionMs, so this should be excluded.
+ SegmentZKMetadata justExpiredSegment = makeSegmentWithEndTimeMs("segment_just_expired", nowMs - sevenDaysMs - 1);
+
+ List segments = new ArrayList<>(List.of(boundarySegment, justExpiredSegment));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, null, nowMs, false);
+
+ assertEquals(filtered.size(), 1);
+ assertEquals(filtered.get(0).getSegmentName(), "segment_boundary",
+ "Segment at exact retention boundary should be kept (strict greater-than comparison)");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionWithBuffer() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("7")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ long oneDayMs = 86_400_000L;
+
+ // 5 days old — within 7d retention but outside effective retention of (7d - 3d = 4d)
+ SegmentZKMetadata borderlineSegment = makeSegmentWithEndTimeMs("segment_borderline", nowMs - 5 * oneDayMs);
+ // 2 days old — within both retention and effective retention
+ SegmentZKMetadata recentSegment = makeSegmentWithEndTimeMs("segment_recent", nowMs - 2 * oneDayMs);
+
+ Map taskConfigs = new HashMap<>();
+ taskConfigs.put(MinionTaskUtils.RETENTION_EXPIRY_BUFFER_PERIOD_KEY, "3d");
+
+ List segments = new ArrayList<>(List.of(borderlineSegment, recentSegment));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, taskConfigs, nowMs, false);
+
+ assertEquals(filtered.size(), 1);
+ assertEquals(filtered.get(0).getSegmentName(), "segment_recent",
+ "With 3d buffer, effective retention is 4d — 5-day-old segment should be excluded");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionBufferExceedsRetentionReturnsAll() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("7")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ SegmentZKMetadata segment = makeSegmentWithEndTimeMs("segment_1", nowMs - 365L * 86_400_000L);
+
+ Map taskConfigs = new HashMap<>();
+ taskConfigs.put(MinionTaskUtils.RETENTION_EXPIRY_BUFFER_PERIOD_KEY, "10d");
+
+ List segments = new ArrayList<>(List.of(segment));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, taskConfigs, nowMs, false);
+
+ assertEquals(filtered.size(), 1,
+ "Buffer exceeding retention should fail-open and return all segments");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionNullTaskConfigsNoBuffer() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("7")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ SegmentZKMetadata expired = makeSegmentWithEndTimeMs("segment_expired", nowMs - 10L * 86_400_000L);
+
+ List segments = new ArrayList<>(List.of(expired));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, null, nowMs, false);
+
+ assertEquals(filtered.size(), 0, "Null taskConfigs means no buffer — expired segment should be filtered");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionMalformedBufferReturnsZeroBuffer() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("7")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ long oneDayMs = 86_400_000L;
+ SegmentZKMetadata expired = makeSegmentWithEndTimeMs("segment_expired", nowMs - 10 * oneDayMs);
+ SegmentZKMetadata recent = makeSegmentWithEndTimeMs("segment_recent", nowMs - 2 * oneDayMs);
+
+ Map taskConfigs = new HashMap<>();
+ taskConfigs.put(MinionTaskUtils.RETENTION_EXPIRY_BUFFER_PERIOD_KEY, "invalid_period");
+
+ List segments = new ArrayList<>(List.of(expired, recent));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, taskConfigs, nowMs, false);
+
+ assertEquals(filtered.size(), 1,
+ "Malformed buffer should fall back to 0 — expired segment still filtered by raw retention");
+ assertEquals(filtered.get(0).getSegmentName(), "segment_recent");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionEmptyTaskConfigsNoBuffer() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("7")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ SegmentZKMetadata expired = makeSegmentWithEndTimeMs("segment_expired", nowMs - 10L * 86_400_000L);
+
+ List segments = new ArrayList<>(List.of(expired));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, new HashMap<>(), nowMs, false);
+
+ assertEquals(filtered.size(), 0,
+ "Empty taskConfigs (no buffer key) means no buffer — expired segment should be filtered");
+ }
+
+ // --- isCreationTimeFallbackEnabled tests ---
+
+ @Test
+ public void testIsCreationTimeFallbackEnabledReturnsDefaultWhenKeyAbsent() {
+ ClusterInfoAccessor mockAccessor = mock(ClusterInfoAccessor.class);
+ when(mockAccessor.getClusterConfig(
+ ControllerConf.ControllerPeriodicTasksConf.ENABLE_RETENTION_CREATION_TIME_FALLBACK)).thenReturn(null);
+ assertFalse(MinionTaskUtils.isCreationTimeFallbackEnabled(mockAccessor));
+ }
+
+ @Test
+ public void testIsCreationTimeFallbackEnabledReturnsTrueWhenSet() {
+ ClusterInfoAccessor mockAccessor = mock(ClusterInfoAccessor.class);
+ when(mockAccessor.getClusterConfig(
+ ControllerConf.ControllerPeriodicTasksConf.ENABLE_RETENTION_CREATION_TIME_FALLBACK)).thenReturn("true");
+ assertTrue(MinionTaskUtils.isCreationTimeFallbackEnabled(mockAccessor));
+ }
+
+ @Test
+ public void testIsCreationTimeFallbackEnabledReturnsFalseWhenExplicitlyDisabled() {
+ ClusterInfoAccessor mockAccessor = mock(ClusterInfoAccessor.class);
+ when(mockAccessor.getClusterConfig(
+ ControllerConf.ControllerPeriodicTasksConf.ENABLE_RETENTION_CREATION_TIME_FALLBACK)).thenReturn("false");
+ assertFalse(MinionTaskUtils.isCreationTimeFallbackEnabled(mockAccessor));
+ }
+
+ @Test
+ public void testIsCreationTimeFallbackEnabledNonBooleanStringReturnsFalse() {
+ ClusterInfoAccessor mockAccessor = mock(ClusterInfoAccessor.class);
+ when(mockAccessor.getClusterConfig(
+ ControllerConf.ControllerPeriodicTasksConf.ENABLE_RETENTION_CREATION_TIME_FALLBACK)).thenReturn("abc");
+ assertFalse(MinionTaskUtils.isCreationTimeFallbackEnabled(mockAccessor));
+ }
+
+ // --- filterSegmentsPastRetention with creation-time fallback tests ---
+
+ @Test
+ public void testFilterSegmentsPastRetentionFallbackEnabledFiltersInvalidEndTimeOldCreationTime() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("7")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ long oneDayMs = 86_400_000L;
+
+ SegmentZKMetadata invalidEndTimeOldCreation =
+ makeSegmentWithCreationTime("segment_invalid_old", -1, nowMs - 10 * oneDayMs);
+ SegmentZKMetadata recent = makeSegmentWithEndTimeMs("segment_recent", nowMs - 2 * oneDayMs);
+
+ List segments = new ArrayList<>(List.of(invalidEndTimeOldCreation, recent));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, null, nowMs, true);
+
+ assertEquals(filtered.size(), 1);
+ assertEquals(filtered.get(0).getSegmentName(), "segment_recent",
+ "With fallback enabled, segment with invalid end time and old creation time should be filtered");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionFallbackDisabledKeepsInvalidEndTimeOldCreationTime() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("7")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ long oneDayMs = 86_400_000L;
+
+ SegmentZKMetadata invalidEndTimeOldCreation =
+ makeSegmentWithCreationTime("segment_invalid_old", -1, nowMs - 10 * oneDayMs);
+
+ List segments = new ArrayList<>(List.of(invalidEndTimeOldCreation));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, null, nowMs, false);
+
+ assertEquals(filtered.size(), 1,
+ "With fallback disabled, segment with invalid end time should be kept regardless of creation time");
+ }
+
+ @Test
+ public void testFilterSegmentsPastRetentionFallbackEnabledKeepsRecentCreationTime() {
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName("testTable")
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("7")
+ .build();
+
+ long nowMs = System.currentTimeMillis();
+ long oneDayMs = 86_400_000L;
+
+ SegmentZKMetadata invalidEndTimeRecentCreation =
+ makeSegmentWithCreationTime("segment_invalid_recent", -1, nowMs - 2 * oneDayMs);
+
+ List segments = new ArrayList<>(List.of(invalidEndTimeRecentCreation));
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(segments, tableConfig, null, nowMs, true);
+
+ assertEquals(filtered.size(), 1,
+ "With fallback enabled, segment with invalid end time but recent creation time should be kept");
+ }
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index d012762ad83c..6432a6d40876 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -38,6 +38,7 @@
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
@@ -1099,6 +1100,65 @@ public void testSegmentSelectionMultiLevels() {
assertEquals(pinotTaskConfigs.size(), 0);
}
+ /**
+ * Tests that retentionExpiryBufferPeriod config causes segments near retention to be filtered out from task
+ * generation. With 30d retention and 5d buffer, effective retention is 25d — segments older than that should be
+ * excluded.
+ */
+ @Test
+ public void testRetentionExpiryBufferFiltersSegments() {
+ Map> taskConfigsMap = new HashMap<>();
+ Map tableTaskConfigs = new HashMap<>();
+ tableTaskConfigs.put("daily.mergeType", "concat");
+ tableTaskConfigs.put("daily.bufferTimePeriod", "2d");
+ tableTaskConfigs.put("daily.bucketTimePeriod", "1d");
+ tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
+ tableTaskConfigs.put(MinionTaskUtils.RETENTION_EXPIRY_BUFFER_PERIOD_KEY, "5d");
+ taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs);
+
+ TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(RAW_TABLE_NAME)
+ .setTimeColumnName(TIME_COLUMN_NAME)
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("30")
+ .setTaskConfig(new TableTaskConfig(taskConfigsMap))
+ .build();
+
+ ClusterInfoAccessor mockClusterInfoProvider = mock(ClusterInfoAccessor.class);
+
+ long nowMs = System.currentTimeMillis();
+ long oneDayMs = 86_400_000L;
+
+ // Within effective retention (25d), outside buffer (2d)
+ String segmentName1 = "testTable__1";
+ SegmentZKMetadata metadata1 = getSegmentZKMetadata(segmentName1,
+ nowMs - 5 * oneDayMs, nowMs - 4 * oneDayMs, TimeUnit.MILLISECONDS, "download1");
+
+ // Past effective retention (27d > 25d)
+ String segmentName2 = "testTable__2";
+ SegmentZKMetadata metadata2 = getSegmentZKMetadata(segmentName2,
+ nowMs - 28 * oneDayMs, nowMs - 27 * oneDayMs, TimeUnit.MILLISECONDS, "download2");
+
+ when(mockClusterInfoProvider.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(
+ Lists.newArrayList(metadata1, metadata2));
+ when(mockClusterInfoProvider.getIdealState(OFFLINE_TABLE_NAME)).thenReturn(
+ getIdealState(OFFLINE_TABLE_NAME, Lists.newArrayList(segmentName1, segmentName2)));
+ when(mockClusterInfoProvider.getTaskStates(MinionConstants.MergeRollupTask.TASK_TYPE))
+ .thenReturn(new HashMap<>());
+ mockMergeRollupTaskMetadataGetterAndSetter(mockClusterInfoProvider);
+
+ MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
+ generator.init(mockClusterInfoProvider);
+ List pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+
+ assertEquals(pinotTaskConfigs.size(), 1);
+ String taskSegments = pinotTaskConfigs.get(0).getConfigs().get(MinionConstants.SEGMENT_NAME_KEY);
+ assertTrue(taskSegments.contains(segmentName1),
+ "Segment within effective retention should be included in task");
+ assertFalse(taskSegments.contains(segmentName2),
+ "Segment past effective retention (with buffer) should be filtered out");
+ }
+
private SegmentZKMetadata getSegmentZKMetadata(String segmentName, long startTime, long endTime, TimeUnit timeUnit,
String downloadURL) {
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
index 29a64624ae3c..614432c3eab2 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
@@ -37,6 +37,7 @@
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
import org.apache.pinot.plugin.minion.tasks.upsertcompactmerge.UpsertCompactMergeTaskGenerator.SegmentMergerMetadata;
import org.apache.pinot.plugin.minion.tasks.upsertcompactmerge.UpsertCompactMergeTaskGenerator.SegmentSelectionResult;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -540,4 +541,63 @@ public void testSegmentListGenerationEdgeCases() {
String crcList = _taskGenerator.getSegmentCrcList(segmentList);
Assert.assertEquals(crcList, "9999");
}
+
+ @Test
+ public void testRetentionExpiryBufferFiltersCandidates() {
+ long nowMs = System.currentTimeMillis();
+ long oneDayMs = 86_400_000L;
+
+ Map taskConfigs = new HashMap<>();
+ taskConfigs.put(MinionConstants.UpsertCompactMergeTask.BUFFER_TIME_PERIOD_KEY, "2d");
+ taskConfigs.put(MinionTaskUtils.RETENTION_EXPIRY_BUFFER_PERIOD_KEY, "5d");
+
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setSnapshot(Enablement.ENABLE);
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME)
+ .setTableName(RAW_TABLE_NAME)
+ .setRetentionTimeUnit("DAYS")
+ .setRetentionTimeValue("30")
+ .setUpsertConfig(upsertConfig)
+ .setTaskConfig(new TableTaskConfig(
+ Map.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE, taskConfigs)))
+ .build();
+
+ // Within effective retention (30d - 5d = 25d), outside buffer (2d)
+ SegmentZKMetadata recentSegment = new SegmentZKMetadata("testTable__0__0__100");
+ recentSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ recentSegment.setStartTime(nowMs - 5 * oneDayMs);
+ recentSegment.setEndTime(nowMs - 4 * oneDayMs);
+ recentSegment.setTimeUnit(TimeUnit.MILLISECONDS);
+ recentSegment.setDownloadUrl("fs://testTable__0__0__100");
+
+ // Past effective retention (27d > 25d), outside buffer (2d)
+ SegmentZKMetadata oldSegment = new SegmentZKMetadata("testTable__0__1__101");
+ oldSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ oldSegment.setStartTime(nowMs - 28 * oneDayMs);
+ oldSegment.setEndTime(nowMs - 27 * oneDayMs);
+ oldSegment.setTimeUnit(TimeUnit.MILLISECONDS);
+ oldSegment.setDownloadUrl("fs://testTable__0__1__101");
+
+ // Within buffer period (filtered by getCandidateSegments)
+ SegmentZKMetadata freshSegment = new SegmentZKMetadata("testTable__0__2__102");
+ freshSegment.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ freshSegment.setStartTime(nowMs - oneDayMs);
+ freshSegment.setEndTime(nowMs);
+ freshSegment.setTimeUnit(TimeUnit.MILLISECONDS);
+ freshSegment.setDownloadUrl("fs://testTable__0__2__102");
+
+ List allSegments =
+ new ArrayList<>(Arrays.asList(recentSegment, oldSegment, freshSegment));
+
+ // Step 1: getCandidateSegments filters out freshSegment (within 2d buffer)
+ List candidates =
+ UpsertCompactMergeTaskGenerator.getCandidateSegments(taskConfigs, allSegments, nowMs);
+ Assert.assertEquals(candidates.size(), 2, "getCandidateSegments should keep recent + old, filter fresh");
+
+ // Step 2: filterSegmentsPastRetention with 5d buffer (effective retention = 25d) filters out oldSegment
+ List filtered =
+ MinionTaskUtils.filterSegmentsPastRetention(candidates, tableConfig, taskConfigs, nowMs, false);
+ Assert.assertEquals(filtered.size(), 1, "Retention filter with buffer should remove the old segment");
+ Assert.assertEquals(filtered.get(0).getSegmentName(), "testTable__0__0__100");
+ }
}