Skip to content

Commit

Permalink
KAFKA-14491: [17/N] Refactor segments cleanup logic
Browse files Browse the repository at this point in the history
Part of KIP-899.

AbstractSegments automatically calls the helper method to clean up expired segments as part of getOrCreateSegmentIfLive(). This works fine for windowed store implementations which call getOrCreateSegmentIfLive() exactly once per put() call, but is inefficient and difficult to reason about for the new RocksDBVersionedStore implementation (cf. #13188) which makes potentially multiple calls to getOrCreateSegmentIfLive() for different segments for a single put() call. This PR addresses this by refactoring the call to clean up expired segments out of getOrCreateSegmentIfLive(), opting to have the different segments implementations specify when cleanup should occur instead. After this PR, RocksDBVersionedStore only cleans up expired segments once per call to put().

Reviewers: Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
vcrfxia committed Mar 21, 2023
1 parent 6fae237 commit 361095a
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 9 deletions.
Expand Up @@ -79,16 +79,12 @@ public S getOrCreateSegmentIfLive(final long segmentId,
final long minLiveTimestamp = streamTime - retentionPeriod;
final long minLiveSegment = segmentId(minLiveTimestamp);

final S toReturn;
if (segmentId >= minLiveSegment) {
// The segment is live. get it, ensure it's open, and return it.
toReturn = getOrCreateSegment(segmentId, context);
return getOrCreateSegment(segmentId, context);
} else {
toReturn = null;
return null;
}

cleanupEarlierThan(minLiveSegment);
return toReturn;
}

@Override
Expand All @@ -113,8 +109,7 @@ public void openExisting(final ProcessorContext context, final long streamTime)
// ignore
}

final long minLiveSegment = segmentId(streamTime - retentionPeriod);
cleanupEarlierThan(minLiveSegment);
cleanupExpiredSegments(streamTime);
}

@Override
Expand Down Expand Up @@ -172,7 +167,8 @@ public void close() {
segments.clear();
}

private void cleanupEarlierThan(final long minLiveSegment) {
protected void cleanupExpiredSegments(final long streamTime) {
final long minLiveSegment = segmentId(streamTime - retentionPeriod);
final Iterator<Map.Entry<Long, S>> toRemove =
segments.headMap(minLiveSegment, false).entrySet().iterator();

Expand Down
Expand Up @@ -53,6 +53,15 @@ public KeyValueSegment getOrCreateSegment(final long segmentId,
}
}

@Override
public KeyValueSegment getOrCreateSegmentIfLive(final long segmentId,
final ProcessorContext context,
final long streamTime) {
final KeyValueSegment segment = super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
cleanupExpiredSegments(streamTime);
return segment;
}

@Override
public void openExisting(final ProcessorContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId());
Expand Down
Expand Up @@ -57,6 +57,11 @@ public void openExisting(final ProcessorContext context, final long streamTime)
physicalStore.openDB(context.appConfigs(), context.stateDir());
}

@Override
public void cleanupExpiredSegments(final long streamTime) {
super.cleanupExpiredSegments(streamTime);
}

@Override
public void flush() {
physicalStore.flush();
Expand Down
Expand Up @@ -480,6 +480,8 @@ private <T extends VersionedStoreSegment> void doPut(
final byte[] value,
final long timestamp
) {
segmentStores.cleanupExpiredSegments(observedStreamTime);

// track the smallest timestamp seen so far that is larger than insertion timestamp.
// this timestamp determines, based on all segments searched so far, which segment the
// new record should be inserted into.
Expand Down
Expand Up @@ -53,6 +53,15 @@ public TimestampedSegment getOrCreateSegment(final long segmentId,
}
}

@Override
public TimestampedSegment getOrCreateSegmentIfLive(final long segmentId,
final ProcessorContext context,
final long streamTime) {
final TimestampedSegment segment = super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
cleanupExpiredSegments(streamTime);
return segment;
}

@Override
public void openExisting(final ProcessorContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId());
Expand Down
Expand Up @@ -108,6 +108,8 @@ public void shouldCleanupSegmentsThatHaveExpired() {
final LogicalKeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(3, context, SEGMENT_INTERVAL * 3L);
final LogicalKeyValueSegment segment4 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);

segments.cleanupExpiredSegments(SEGMENT_INTERVAL * 7L);

final List<LogicalKeyValueSegment> allSegments = segments.allSegments(true);
assertEquals(2, allSegments.size());
assertEquals(segment3, allSegments.get(0));
Expand Down

0 comments on commit 361095a

Please sign in to comment.