KAFKA-20173: Ensure Metered session-stores pass headers correctly#21957
KAFKA-20173: Ensure Metered session-stores pass headers correctly#21957mjsax merged 2 commits intoapache:trunkfrom
Conversation
Ensures that all Metered Session-stores (plain and headers) pass headers into de/serializers.
| return readHeaders(buffer); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
We don't need these any longer -- we have the corresponding method inside the stores, and I don't think it make sense to extract them into this Utils class (also given, that we need to pass in StateSerdes...)
| } | ||
|
|
||
| protected K deserializeKey(final byte[] rawKey) { | ||
| private K deserializeKey(final byte[] rawKey) { |
There was a problem hiding this comment.
Side cleanup -- sub-classes don't use this anyway, so we can make it private and get rid of the overload that only throws.
| final class MeteredWindowedKeyValueWithHeadersIterator<K, VInner, VOuter> extends MeteredWindowedKeyValueIterator<K, VOuter> { | ||
| private final Function<byte[], VInner> deserializeValue; | ||
| private final BiFunction<byte[], Headers, K> deserializeKey; | ||
| private final Function<VInner, Headers> headersExtractor; |
There was a problem hiding this comment.
To make this iterator generic (ie, work with ValueTimestampHeaders and AggregationWithHeaders we need this helper function.
|
|
||
| @Override | ||
| protected K deserializeKey(final byte[] rawKey) { | ||
| throw new UnsupportedOperationException("MeteredTimestampedWindowStoreWithHeaders required to pass in Headers when deserializing a key."); |
There was a problem hiding this comment.
We can make the super class method private so we don't need this override any longer.
There was a problem hiding this comment.
+1 for this clean up
Actually I opened a pull request for the same change while review MeteredSessionStoreWithHeaders : #21966
Happy that we come up with the same idea :)
There is few more changes in my PR. Can you may incorporate them to current PR? Check MeteredTimestampedKeyValueStoreWithHeaders, we can make few methods private as well
| ); | ||
| } | ||
|
|
||
| private final class MeteredWindowedKeyValueWithHeadersIterator<ValueType> extends MeteredWindowedKeyValueIterator<K, ValueType> { |
There was a problem hiding this comment.
Extracted into standalone class, so we can also use it in MeteredSessionStoreWithHeaders class.
| final QueryResult<MeteredWindowStoreIterator<ValueAndTimestamp<V>>> typedQueryResult = | ||
| InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult); | ||
| queryResult = (QueryResult<R>) typedQueryResult; | ||
| queryResult = (QueryResult<R>) InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult); |
There was a problem hiding this comment.
Just some cleanup -- there is no value to have typedQueryResult.
|
|
||
| if (query instanceof WindowRangeQuery) { | ||
| final WindowRangeQuery<K, AGG> windowRangeQuery = (WindowRangeQuery<K, AGG>) query; | ||
| if (windowRangeQuery.getKey().isPresent()) { |
There was a problem hiding this comment.
Moved this check inside runRangeQuery to align with how we setup the code in other classes
| wrapped().query(rawKeyQuery, positionBound, config); | ||
| if (rawResult.isSuccess()) { | ||
| final MeteredWindowedKeyValueIterator<K, AGG> typedResult = | ||
| new MeteredWindowedKeyValueWithHeadersIterator<>( |
There was a problem hiding this comment.
Switching to a different (already existing) iterator class, to be able to handle headers correctly.
| private Bytes serializeKey(final K key) { | ||
| return Bytes.wrap(serdes.rawKey(key, internalContext.headers())); | ||
| } |
There was a problem hiding this comment.
+1 for making those methods private until they needed in sub-classes
There is a functional difference between those two method. New version doesn't check for null. Are we sure null value won't make trouble?
UPD: We should be fine
public static Bytes wrap(byte[] bytes) {
if (bytes == null)
return null;
return new Bytes(bytes);
}| return super.prepareValueSerdeForStore(valueSerde, getter); | ||
| } | ||
|
|
||
| private Bytes serializeKey(final K key, final Headers headers) { |
There was a problem hiding this comment.
Minor: Typically those methods are sitting at the end of class. See https://github.com/apache/kafka/pull/21957/changes#diff-5fd8544440131420a7b211f9a8483b610487f2a74c4e0681204a3b1949bfc5e0R540
| wrapped().query(rawKeyQuery, positionBound, config); | ||
| if (rawResult.isSuccess()) { | ||
| final MeteredWindowedKeyValueIterator<K, AGG> typedResult = | ||
| new MeteredWindowedKeyValueWithHeadersIterator<>( |
|
|
||
| @Override | ||
| protected K deserializeKey(final byte[] rawKey) { | ||
| throw new UnsupportedOperationException("MeteredTimestampedWindowStoreWithHeaders required to pass in Headers when deserializing a key."); |
There was a problem hiding this comment.
+1 for this clean up
Actually I opened a pull request for the same change while review MeteredSessionStoreWithHeaders : #21966
Happy that we come up with the same idea :)
There is few more changes in my PR. Can you may incorporate them to current PR? Check MeteredTimestampedKeyValueStoreWithHeaders, we can make few methods private as well
|
LTGM now + tests are passed |
…1957) Ensures that all Metered Session-stores (plain and headers) pass headers into de/serializers. Reviewers: Uladzislau Blok <blokv75@gmail.com>, TengYao Chi <frankvicky@apache.org>
|
Merged to |
…ache#21957) Ensures that all Metered Session-stores (plain and headers) pass headers into de/serializers. Reviewers: Uladzislau Blok <blokv75@gmail.com>, TengYao Chi <frankvicky@apache.org>
Ensures that all Metered Session-stores (plain and headers) pass headers
into de/serializers.
Reviewers: Uladzislau Blok blokv75@gmail.com, TengYao Chi
frankvicky@apache.org