Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HOTFIX: Revert "KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…" #12745

Merged
merged 1 commit into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
protected final AbstractSegments<S> segments;
protected final KeySchema baseKeySchema;
protected final Optional<KeySchema> indexKeySchema;
private final long retentionPeriod;


protected ProcessorContext context;
Expand All @@ -67,44 +66,36 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
AbstractDualSchemaRocksDBSegmentedBytesStore(final String name,
final KeySchema baseKeySchema,
final Optional<KeySchema> indexKeySchema,
final AbstractSegments<S> segments,
final long retentionPeriod) {
final AbstractSegments<S> segments) {
this.name = name;
this.baseKeySchema = baseKeySchema;
this.indexKeySchema = indexKeySchema;
this.segments = segments;
this.retentionPeriod = retentionPeriod;
}

@Override
public KeyValueIterator<Bytes, byte[]> all() {

final long actualFrom = getActualFrom(0, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);

final List<S> searchSpace = segments.allSegments(true);
final Bytes from = baseKeySchema.lowerRange(null, actualFrom);
final Bytes from = baseKeySchema.lowerRange(null, 0);
final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE);

return new SegmentIterator<>(
searchSpace.iterator(),
baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true),
baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true),
from,
to,
true);
}

@Override
public KeyValueIterator<Bytes, byte[]> backwardAll() {

final long actualFrom = getActualFrom(0, baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);

final List<S> searchSpace = segments.allSegments(false);
final Bytes from = baseKeySchema.lowerRange(null, actualFrom);
final Bytes from = baseKeySchema.lowerRange(null, 0);
final Bytes to = baseKeySchema.upperRange(null, Long.MAX_VALUE);

return new SegmentIterator<>(
searchSpace.iterator(),
baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false),
baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, false),
from,
to,
false);
Expand All @@ -128,15 +119,6 @@ public void remove(final Bytes rawBaseKey) {

abstract protected KeyValue<Bytes, byte[]> getIndexKeyValue(final Bytes baseKey, final byte[] baseValue);

// isTimeFirstWindowSchema true implies ON_WINDOW_CLOSE semantics. There's an edge case
// when retentionPeriod = grace Period. If we add 1, then actualFrom > to which would
// lead to no records being returned.
protected long getActualFrom(final long from, final boolean isTimeFirstWindowSchema) {
return isTimeFirstWindowSchema ? Math.max(from, observedStreamTime - retentionPeriod) :
Math.max(from, observedStreamTime - retentionPeriod + 1);

}

// For testing
void putIndex(final Bytes indexKey, final byte[] value) {
if (!hasIndex()) {
Expand Down Expand Up @@ -209,24 +191,7 @@ public void put(final Bytes rawBaseKey,

@Override
public byte[] get(final Bytes rawKey) {
final long timestampFromRawKey = baseKeySchema.segmentTimestamp(rawKey);
// check if timestamp is expired

if (baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
if (timestampFromRawKey < observedStreamTime - retentionPeriod) {
LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
rawKey.toString(), timestampFromRawKey, observedStreamTime - retentionPeriod);
return null;
}
} else {
if (timestampFromRawKey < observedStreamTime - retentionPeriod + 1) {
LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
rawKey.toString(), timestampFromRawKey, observedStreamTime - retentionPeriod + 1);
return null;
}
}

final S segment = segments.getSegmentForTimestamp(timestampFromRawKey);
final S segment = segments.getSegmentForTimestamp(baseKeySchema.segmentTimestamp(rawKey));
if (segment == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
private final String name;
private final AbstractSegments<S> segments;
private final String metricScope;
private final long retentionPeriod;
private final KeySchema keySchema;

private ProcessorContext context;
Expand All @@ -66,12 +65,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se

AbstractRocksDBSegmentedBytesStore(final String name,
final String metricScope,
final long retentionPeriod,
final KeySchema keySchema,
final AbstractSegments<S> segments) {
this.name = name;
this.metricScope = metricScope;
this.retentionPeriod = retentionPeriod;
this.keySchema = keySchema;
this.segments = segments;
}
Expand All @@ -94,30 +91,19 @@ KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
final long from,
final long to,
final boolean forward) {
final long actualFrom = getActualFrom(from);
final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);

if (keySchema instanceof WindowKeySchema && to < actualFrom) {
LOG.debug("Returning no records for key {} as to ({}) < actualFrom ({}) ", key.toString(), to, actualFrom);
return KeyValueIterators.emptyIterator();
}

final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, to, forward);

final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, actualFrom);
final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from);
final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to);

return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(key, key, actualFrom, to, forward),
keySchema.hasNextCondition(key, key, from, to, forward),
binaryFrom,
binaryTo,
forward);
}

private long getActualFrom(final long from) {
return Math.max(from, observedStreamTime - retentionPeriod + 1);
}

@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
final Bytes keyTo,
Expand Down Expand Up @@ -147,48 +133,38 @@ KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom,
return KeyValueIterators.emptyIterator();
}

final long actualFrom = getActualFrom(from);
final List<S> searchSpace = keySchema.segmentsToSearch(segments, from, to, forward);

if (keySchema instanceof WindowKeySchema && to < actualFrom) {
LOG.debug("Returning no records for keys {}/{} as to ({}) < actualFrom ({}) ", keyFrom, keyTo, to, actualFrom);
return KeyValueIterators.emptyIterator();
}

final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, to, forward);

final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, actualFrom);
final Bytes binaryFrom = keyFrom == null ? null : keySchema.lowerRange(keyFrom, from);
final Bytes binaryTo = keyTo == null ? null : keySchema.upperRange(keyTo, to);

return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(keyFrom, keyTo, actualFrom, to, forward),
keySchema.hasNextCondition(keyFrom, keyTo, from, to, forward),
binaryFrom,
binaryTo,
forward);
}

@Override
public KeyValueIterator<Bytes, byte[]> all() {
final long actualFrom = getActualFrom(0);
final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, Long.MAX_VALUE, true);
final List<S> searchSpace = segments.allSegments(true);

return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true),
keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, true),
null,
null,
true);
}

@Override
public KeyValueIterator<Bytes, byte[]> backwardAll() {
final long actualFrom = getActualFrom(0);

final List<S> searchSpace = keySchema.segmentsToSearch(segments, actualFrom, Long.MAX_VALUE, false);
final List<S> searchSpace = segments.allSegments(false);

return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false),
keySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE, false),
null,
null,
false);
Expand All @@ -197,18 +173,11 @@ public KeyValueIterator<Bytes, byte[]> backwardAll() {
@Override
public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
final long actualFrom = getActualFrom(timeFrom);

if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", timeTo, actualFrom);
return KeyValueIterators.emptyIterator();
}

final List<S> searchSpace = segments.segments(actualFrom, timeTo, true);
final List<S> searchSpace = segments.segments(timeFrom, timeTo, true);

return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(null, null, actualFrom, timeTo, true),
keySchema.hasNextCondition(null, null, timeFrom, timeTo, true),
null,
null,
true);
Expand All @@ -217,18 +186,11 @@ public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,
@Override
public KeyValueIterator<Bytes, byte[]> backwardFetchAll(final long timeFrom,
final long timeTo) {
final long actualFrom = getActualFrom(timeFrom);

if (keySchema instanceof WindowKeySchema && timeTo < actualFrom) {
LOG.debug("Returning no records for as timeTo ({}) < actualFrom ({}) ", timeTo, actualFrom);
return KeyValueIterators.emptyIterator();
}

final List<S> searchSpace = segments.segments(actualFrom, timeTo, false);
final List<S> searchSpace = segments.segments(timeFrom, timeTo, false);

return new SegmentIterator<>(
searchSpace.iterator(),
keySchema.hasNextCondition(null, null, actualFrom, timeTo, false),
keySchema.hasNextCondition(null, null, timeFrom, timeTo, false),
null,
null,
false);
Expand Down Expand Up @@ -272,14 +234,7 @@ public void put(final Bytes key,

@Override
public byte[] get(final Bytes key) {
final long timestampFromKey = keySchema.segmentTimestamp(key);
// check if timestamp is expired
if (timestampFromKey < observedStreamTime - retentionPeriod + 1) {
LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})",
key.toString(), timestampFromKey, observedStreamTime - retentionPeriod + 1);
return null;
}
final S segment = segments.getSegmentForTimestamp(timestampFromKey);
final S segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
if (segment == null) {
return null;
}
Expand Down