Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9/N][Emit final] Emit final for session window aggregations #12204

Merged
Merged
Expand Up @@ -181,11 +181,6 @@ public void process(final Record<KIn, VIn> record) {
store.remove(session.key);

maybeForwardUpdate(session.key, session.value, null);
/*
tupleForwarder.maybeForward(
record.withKey(session.key)
.withValue(new Change<>(null, session.value)));
*/
}
}

Expand All @@ -194,11 +189,6 @@ public void process(final Record<KIn, VIn> record) {
store.put(sessionKey, agg);

maybeForwardUpdate(sessionKey, null, agg);
/*
tupleForwarder.maybeForward(
record.withKey(sessionKey)
.withValue(new Change<>(agg, null)));
*/
}

maybeForwardFinalResult(record, windowCloseTime);
Expand Down Expand Up @@ -254,7 +244,7 @@ private boolean shouldEmitFinal(final long windowCloseTime) {
}

private long emitRangeLowerBound() {
return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ? 0L : Math.max(0L, lastEmitWindowCloseTime);
return Math.max(0L, lastEmitWindowCloseTime);
}

private long emitRangeUpperBound(final long windowCloseTime) {
Expand All @@ -264,7 +254,9 @@ private long emitRangeUpperBound(final long windowCloseTime) {
}

private boolean shouldRangeFetch(final long emitRangeLowerBound, final long emitRangeUpperBound) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one minor bug I detected in the latest commit.

return emitRangeUpperBound > emitRangeLowerBound;
// since a session window could be a single point (i.e. [t, t]),
// we need to range fetch and emit even if the upper and lower bound are the same
return emitRangeUpperBound >= emitRangeLowerBound;
}

private void fetchAndEmit(final Record<KIn, VIn> record,
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

import java.time.Instant;
import java.util.List;

abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> {
Expand Down Expand Up @@ -259,6 +260,12 @@ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
}

@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final Instant earliestSessionEndTime,
final Instant latestSessionEndTime) {
return wrapped().findSessions(earliestSessionEndTime, latestSessionEndTime);
}

@Override
public void remove(final Windowed<K> sessionKey) {
wrapped().remove(sessionKey);
Expand Down
Expand Up @@ -25,6 +25,8 @@
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;

import java.time.Instant;

import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

/**
Expand Down Expand Up @@ -95,6 +97,12 @@ public byte[] fetchSession(final Bytes key, final long earliestSessionEndTime, f
return wrapped().fetchSession(key, earliestSessionEndTime, latestSessionStartTime);
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Instant earliestSessionEndTime,
final Instant latestSessionEndTime) {
return wrapped().findSessions(earliestSessionEndTime, latestSessionEndTime);
}

@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes key) {
return wrapped().backwardFetch(key);
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;

import java.time.Instant;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -346,6 +347,18 @@ public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom,
time);
}

@Override
public KeyValueIterator<Windowed<K>, V> findSessions(final Instant earliestSessionEndTime,
final Instant latestSessionEndTime) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().findSessions(earliestSessionEndTime, latestSessionEndTime),
fetchSensor,
streamsMetrics,
serdes::keyFrom,
serdes::valueFrom,
time);
}

@Override
public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K keyFrom,
final K keyTo,
Expand Down
Expand Up @@ -203,8 +203,9 @@ public static void writeBinary(final ByteBuffer buf,
final long endTime) {
buf.putLong(endTime);
buf.putLong(startTime);
if (key != null)
if (key != null) {
buf.put(key.get());
}
}

public static Bytes toBinary(final Bytes key,
Expand Down
Expand Up @@ -39,7 +39,7 @@
*/
public class RocksDBTimeOrderedSessionSegmentedBytesStore extends AbstractRocksDBTimeOrderedSegmentedBytesStore {

private class SessionKeySchemaIndexToBaseStoreIterator extends IndexToBaseStoreIterator {
private class SessionKeySchemaIndexToBaseStoreIterator extends IndexToBaseStoreIterator {
SessionKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, byte[]> indexIterator) {
super(indexIterator);
}
Expand Down Expand Up @@ -76,10 +76,10 @@ public KeyValueIterator<Bytes, byte[]> fetchSessions(final long earliestSessionE
final long latestSessionEndTime) {
final List<KeyValueSegment> searchSpace = segments.segments(earliestSessionEndTime, latestSessionEndTime, true);

// here we use both as lower range since we the search boundaries are both on the session time end;
// hence effectively our search range is [0 earliestSE] - [0, latestSE]
// here we want [0, latestSE, FF] as the upper bound to cover any possible keys,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other minor bug I detected in the latest commit.

// but since we can only get upper bound based on timestamps, we use a slight larger upper bound as [0, latestSE+1]
final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(null, earliestSessionEndTime);
final Bytes binaryTo = baseKeySchema.lowerRangeFixedSize(null, latestSessionEndTime);
final Bytes binaryTo = baseKeySchema.lowerRangeFixedSize(null, latestSessionEndTime + 1);

return new SegmentIterator<>(
searchSpace.iterator(),
Expand All @@ -90,9 +90,9 @@ public KeyValueIterator<Bytes, byte[]> fetchSessions(final long earliestSessionE
final Windowed<Bytes> windowedKey = TimeFirstSessionKeySchema.from(bytes);
final long endTime = windowedKey.window().end();

if (endTime <= latestSessionEndTime && endTime >= earliestSessionEndTime)
if (endTime <= latestSessionEndTime && endTime >= earliestSessionEndTime) {
return true;

}
iterator.next();
}
return false;
Expand Down
Expand Up @@ -54,24 +54,18 @@ private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, b
return inner;
}

if (!inner.persistent()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the other change I made in order to work around the current tricky situation, since in-memory stores are always "time ordered" as well, we stripe the caching if the inner store is not persistent. cc @mjsax

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we strip the caching, this applies to eager emitting, right? -- So it would be a behavioral change? Do we want to piggy-back such a change into this KIP? Sounds "risky"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I will move this logic into the earlier stage during the topology building phase for the moment. Also cc @lihaosky who would do similar things for sliding windows.

return inner;
}

// do not enable cache if the underlying store is time ordered
if (isTimeOrderedStore(inner)) {
if (storeSupplier instanceof RocksDbTimeOrderedSessionBytesStoreSupplier) {
return inner;
}

return new CachingSessionStore(inner, storeSupplier.segmentIntervalMs());
}

private boolean isTimeOrderedStore(final StateStore stateStore) {
if (stateStore instanceof RocksDBTimeOrderedWindowStore) {
return true;
}
if (stateStore instanceof WrappedStateStore) {
return isTimeOrderedStore(((WrappedStateStore) stateStore).wrapped());
}
return false;
}

private SessionStore<Bytes, byte[]> maybeWrapLogging(final SessionStore<Bytes, byte[]> inner) {
if (!enableLogging) {
return inner;
Expand Down