-
Notifications
You must be signed in to change notification settings - Fork 329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SAMZA-1056: Added wiring for High Level API state stores, their serdes and changelogs. #309
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks really good!
@@ -47,23 +48,6 @@ | |||
* | |||
* @return the key value store containing the state for this stream | |||
*/ | |||
KeyValueStore<K, PartialJoinMessage<M>> getState(); | |||
KeyValueStore<K, TimestampedValue<M>> getState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks! TimestampedValue
is much nicer than PartialJoinMessage
!
if (otherMessage != null && otherMessage.getReceivedTimeMs() > now - ttlMs) { | ||
RM joinResult = thisPartialJoinFn.apply(message, otherMessage.getMessage()); | ||
if (otherMessage != null && otherMessage.getTimestamp() > now - ttlMs) { | ||
JM joinResult = thisPartialJoinFn.apply(message, otherMessage.getValue()); | ||
return Collections.singletonList(joinResult); | ||
} | ||
return Collections.emptyList(); | ||
} | ||
|
||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we rely on rocksDb ttl for expiry? Instead of doing point-deletes on the store?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we configure RocksDb TTL + changelog TTL == ttlMs.
@@ -47,23 +48,6 @@ | |||
* | |||
* @return the key value store containing the state for this stream | |||
*/ | |||
KeyValueStore<K, PartialJoinMessage<M>> getState(); | |||
KeyValueStore<K, TimestampedValue<M>> getState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be
KVStore<TimeSeriesKey<K>, V>
? Why do we need to store the timestamp as a part of the value?
You can also perhaps use the same operator for doing windowed-joins later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we need to look up by key for doing joins b/w the two streams, and timestamps are not part of the key. I don't want to use a timeline store for this version since all queries would be range queries which is not caching-friendly. We can refactor if necessary when we do other kind of joins.
@nickpan47 @jmakes FYI |
…logs. Provided join operator access to durable state stores.
a420a0f
to
e20a21a
Compare
@Override | ||
public TimestampedValue<V> fromBytes(byte[] bytes) { | ||
ByteBuffer bb = ByteBuffer.wrap(bytes); | ||
byte[] vBytes = new byte[bytes.length - 8 /* long ts bytes */]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: use a constant TIMESTAMP_LEN
here instead of 8?
} | ||
|
||
@Override | ||
public TimestampedValue<V> fromBytes(byte[] bytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to verify the behavior with empty vBytes
with a test!
// changelog streams. Hence we just force an unclean shutdown here to. This _should be_ OK | ||
// since the test method has already executed by the time the shutdown hook is called. The side effect is | ||
// that buffered state (e.g. changelog contents) might not be flushed correctly after the test run. | ||
configs.put("task.shutdown.ms", "100"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we simply set it to 1
then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approved. looks good to me.
Provided join operator access to durable state stores.