Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils;

import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Utility methods for evaluating segment retention eligibility.
*/
public class RetentionUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(RetentionUtils.class);

private RetentionUtils() {
}

/**
* Implements the time-comparison and creation-time fallback logic used by {@code TimeRetentionStrategy} for
* completed segments. Does NOT check segment completion status — callers must guarantee that only completed
* segments (DONE or UPLOADED) are passed in.
* <ul>
* <li>If end time is valid: expired when {@code currentTimeMs - endTimeMs > retentionMs}.</li>
* <li>If end time is invalid and {@code useCreationTimeFallback} is true and creation time is valid:
* expired when {@code currentTimeMs - creationTimeMs > retentionMs}.</li>
* <li>Otherwise: not expired (fail-open).</li>
* </ul>
*
* @param tableNameWithType table name with type suffix, used for logging
* @param segmentZKMetadata segment metadata
* @param retentionMs retention period in milliseconds (must be positive)
* @param currentTimeMs current wall-clock time in milliseconds
* @param useCreationTimeFallback when true, fall back to creation time if end time is invalid
* (must match {@code controller.retentionManager.enableCreationTimeFallback})
* @return true if the segment is past the retention boundary, false otherwise
*/
public static boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata, long retentionMs,
long currentTimeMs, boolean useCreationTimeFallback) {
String segmentName = segmentZKMetadata.getSegmentName();
long endTimeMs = segmentZKMetadata.getEndTimeMs();
if (TimeUtils.timeValueInValidRange(endTimeMs)) {
return currentTimeMs - endTimeMs > retentionMs;
}
if (useCreationTimeFallback) {
long creationTimeMs = segmentZKMetadata.getCreationTime();
if (TimeUtils.timeValueInValidRange(creationTimeMs)) {
LOGGER.debug("Segment: {} of table: {} has invalid end time: {}. Using creation time: {} as fallback",
segmentName, tableNameWithType, endTimeMs, creationTimeMs);
return currentTimeMs - creationTimeMs > retentionMs;
}
LOGGER.warn("Segment: {} of table: {} has invalid end time: {} and invalid creation time: {}. "
+ "Cannot determine retention, skipping", segmentName, tableNameWithType, endTimeMs, creationTimeMs);
} else {
LOGGER.warn("Segment: {} of table: {} has invalid end time in millis: {}. "
+ "Creation time fallback is disabled", segmentName, tableNameWithType, endTimeMs);
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils;

import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.testng.annotations.Test;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;


public class RetentionUtilsTest {

private static final String TABLE_NAME = "testTable_REALTIME";
private static final long ONE_DAY_MS = TimeUnit.DAYS.toMillis(1);
private static final long RETENTION_MS = TimeUnit.DAYS.toMillis(7);

private static SegmentZKMetadata makeSegment(long endTimeMs) {
SegmentZKMetadata segment = new SegmentZKMetadata("seg");
segment.setEndTime(endTimeMs);
segment.setTimeUnit(TimeUnit.MILLISECONDS);
return segment;
}

private static SegmentZKMetadata makeSegmentWithCreationTime(long endTimeMs, long creationTimeMs) {
SegmentZKMetadata segment = new SegmentZKMetadata("seg");
segment.setEndTime(endTimeMs);
segment.setTimeUnit(TimeUnit.MILLISECONDS);
segment.setCreationTime(creationTimeMs);
return segment;
}

@Test
public void testExpiredSegmentIsPurgeable() {
long now = System.currentTimeMillis();
SegmentZKMetadata segment = makeSegment(now - 10 * ONE_DAY_MS);
assertTrue(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
}

@Test
public void testRecentSegmentIsNotPurgeable() {
long now = System.currentTimeMillis();
SegmentZKMetadata segment = makeSegment(now - 2 * ONE_DAY_MS);
assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
}

@Test
public void testExactBoundaryIsNotPurgeable() {
// strict greater-than: segment at exactly retentionMs old should NOT be purgeable
long now = System.currentTimeMillis();
SegmentZKMetadata segment = makeSegment(now - RETENTION_MS);
assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
}

@Test
public void testOneMsPastBoundaryIsPurgeable() {
long now = System.currentTimeMillis();
SegmentZKMetadata segment = makeSegment(now - RETENTION_MS - 1);
assertTrue(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
}

@Test
public void testInvalidEndTimeIsNotPurgeable() {
long now = System.currentTimeMillis();
SegmentZKMetadata segment = new SegmentZKMetadata("seg");
segment.setEndTime(-1);
segment.setTimeUnit(TimeUnit.MILLISECONDS);
assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
}

@Test
public void testFarFutureEndTimeIsNotPurgeable() {
long now = System.currentTimeMillis();
long farFuture = now + TimeUnit.DAYS.toMillis(365 * 200L);
SegmentZKMetadata segment = makeSegment(farFuture);
assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
}

@Test
public void testInvalidEndTimeFallbackDisabledIsNotPurgeable() {
long now = System.currentTimeMillis();
SegmentZKMetadata segment = makeSegmentWithCreationTime(-1, now - 10 * ONE_DAY_MS);
assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, false));
}

@Test
public void testInvalidEndTimeOldCreationTimeFallbackEnabledIsPurgeable() {
long now = System.currentTimeMillis();
SegmentZKMetadata segment = makeSegmentWithCreationTime(-1, now - 10 * ONE_DAY_MS);
assertTrue(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, true));
}

@Test
public void testInvalidEndTimeRecentCreationTimeFallbackEnabledIsNotPurgeable() {
long now = System.currentTimeMillis();
SegmentZKMetadata segment = makeSegmentWithCreationTime(-1, now - 2 * ONE_DAY_MS);
assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, true));
}

@Test
public void testInvalidEndTimeInvalidCreationTimeFallbackEnabledIsNotPurgeable() {
long now = System.currentTimeMillis();
SegmentZKMetadata segment = makeSegmentWithCreationTime(-1, -1);
assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, true));
}

@Test
public void testValidEndTimeTakesPriorityOverCreationTimeFallback() {
long now = System.currentTimeMillis();
SegmentZKMetadata segment = makeSegmentWithCreationTime(now - 2 * ONE_DAY_MS, now - 10 * ONE_DAY_MS);
assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.RetentionUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -52,30 +53,8 @@ public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZK
return false; // Incomplete segments don't have final end time and should not be purged
}

String segmentName = segmentZKMetadata.getSegmentName();
long endTimeMs = segmentZKMetadata.getEndTimeMs();

// If end time is valid, use it directly
if (TimeUtils.timeValueInValidRange(endTimeMs)) {
return System.currentTimeMillis() - endTimeMs > _retentionMs;
}

long creationTimeMs = segmentZKMetadata.getCreationTime();

if (_useCreationTimeFallback && TimeUtils.timeValueInValidRange(creationTimeMs)) {
LOGGER.debug("Segment: {} of table: {} has invalid end time: {}. Using creation time: {} as fallback",
segmentName, tableNameWithType, endTimeMs, creationTimeMs);
return System.currentTimeMillis() - creationTimeMs > _retentionMs;
}

if (_useCreationTimeFallback) {
LOGGER.warn("Segment: {} of table: {} has invalid end time: {} and invalid creation time: {}. "
+ "Cannot determine retention, skipping", segmentName, tableNameWithType, endTimeMs, creationTimeMs);
} else {
LOGGER.warn("Segment: {} of table: {} has invalid end time in millis: {}. "
+ "Creation time fallback is disabled", segmentName, tableNameWithType, endTimeMs);
}
return false;
return RetentionUtils.isPurgeable(tableNameWithType, segmentZKMetadata, _retentionMs,
System.currentTimeMillis(), _useCreationTimeFallback);
}

@Override
Expand Down
Loading
Loading