Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15303: Avoid unnecessary re-serialization in FK-join #14157

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Aug 7, 2023

FK-joins do a couple of unnecessary deserialization and re-serialization steps. These steps are not just an unnecessary perf hit, but can even break the join for the case of scheme evaluation in which the serialized bytes[] might change breaking the "record value hash comparison step" when processing a subscription response.

This PR removes all unnessary de-/re-serialization steps and change some FK-jon Processors to work on plain bytes[] instead.

Note: this is intended to be a POC PR only at this point; opened it to collect early feedback, to see if we believe it's a viable solution. -- I also think that there is more unnecessary de/serialization going on that we could also remove. For example SubscriptionReceiveProcessorSupplier seem to unnecessarily deserialize the full payload but we would work with Bytes/byte[] types. (Not sure if there are more.) -- Last: I had the idea that we could also introduce a RawXxxStore interface that MeteredXxxStore would implement, to allow skipping the deserialization step on-read as an alternative solution. Thoughts?

FK-joins do a couple of unnecessary deserialization and re-serialization
steps. These steps are not just an unnecessary perf hit, but can even
break the join for the case of scheme evaluation in which the serialized
bytes[] might change breaking the "record value hash comparison step"
when processing a subscription response.

This PR removes all unnessary de-/re-serialization steps and change some
FK-jon Processors to work on plain bytes[] instead.
import org.apache.kafka.streams.processor.api.ProcessorSupplier;

public interface KStreamAggProcessorSupplier<KIn, VIn, KAgg, VAgg> extends ProcessorSupplier<KIn, VIn, KAgg, Change<VAgg>> {

KTableValueGetterSupplier<KAgg, VAgg> view();

default KTableValueGetterSupplier<Bytes, byte[]> rawView() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has only a default to make the code compile -- we should remove a default to force all classes to provide a proper implementation -- seems no test fails, what means we actually have tests to add for case that should fail with NPE now.

.withTimestamp(kv.key.window().end())
.withHeaders(record.headers()));

try (final KeyValueIterator<Windowed<KIn>, VAgg> windowToEmit =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated side fix to close the iterator.

import org.apache.kafka.streams.processor.api.ProcessorSupplier;

public interface KTableProcessorSupplier<KIn, VIn, KOut, VOut> extends ProcessorSupplier<KIn, Change<VIn>, KOut, Change<VOut>> {

KTableValueGetterSupplier<KOut, VOut> view();

default KTableValueGetterSupplier<Bytes, byte[]> rawView() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above: should not have a default impl


final long[] currentHash = currentValueWithTimestamp == null ?
null :
Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, currentValueWithTimestamp.value()));
//Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, currentValueWithTimestamp.value()));
Murmur3.hash128(currentValueWithTimestamp.value());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix: we don't re-serialize the value to re-compute the hash.

@mjsax mjsax added the streams label Aug 7, 2023
public KeyValueRawStoreWrapper(final ProcessorContext<?, ?> context, final String storeName) {
try {
// first try timestamped store
timestampedStore = ((WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>) ((WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>) context.getStateStore(storeName)).wrapped()).wrapped();
Copy link
Member Author

@mjsax mjsax Aug 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty much just hacked together right now... (Same in L67 below.)

This nasty unwrapping business resulted in the idea to add a RawXxxStore interface that MeteredXxxStore could implement, and we would just cast to RawXxxStore here instead of un-wrapping.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it alternatively sufficient if we implement a WrappedKeyValueState store that correctly pulls through the get calls?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just cycling back to this -- not sure what the difference between RawXxxStore and WrappedKeyValueState would be? (Hope you can still remember... 😁)

@mjsax mjsax changed the title KAFKA-15030: Avoid unnecessary re-serialization in FK-join KAFKA-15303: Avoid unnecessary re-serialization in FK-join Aug 7, 2023
@Chuckame
Copy link

Chuckame commented Sep 5, 2023

Hello @mjsax, do you know if there is potential advancement about this fix ? 🙇

@mjsax
Copy link
Member Author

mjsax commented Sep 8, 2023

\cc @cadonna @ableegoldman @lucasbru @guozhangwang -- anyone has some spare cycles to review so we can make progress?

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix makes sense to me. I have limited knowledge of the state store hierarchy, so I think others are better suited if this is the best way to pull the raw values through.

As I understand, before a final review, we are still missing a bunch of implementations of the rawView method.

public KeyValueRawStoreWrapper(final ProcessorContext<?, ?> context, final String storeName) {
try {
// first try timestamped store
timestampedStore = ((WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>) ((WrappedStateStore<KeyValueStore<Bytes, byte[]>, K, V>) context.getStateStore(storeName)).wrapped()).wrapped();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it alternatively sufficient if we implement a WrappedKeyValueState store that correctly pulls through the get calls?

@ableegoldman
Copy link
Contributor

I've seen this but haven't had any time yet to give it a closer look, will try to do so in the next few weeks. Can't help but feel like this is the final straw with the store hierarchy mess though -- are we introducing too much additional complexity targeted as a specific use case (or subset, but AFAICT this is all FKJ related)? Not "too much" as in we shouldn't do anything, but have you thought at all about whether there's a way to tackle this by just finally scrapping the whole mess and making time for that state store cleanup that's been coming for years now? Are we just digging ourselves into a deeper hole if we don't?

(full discloser: I haven't looked at the PR for more than a minute so far, just voicing my immediate reaction to this)

@guozhangwang
Copy link
Contributor

@mjsax sorry I was away and just saw this. Are you still looking for reviews? If yes I can take a peek.

@mjsax
Copy link
Member Author

mjsax commented Oct 6, 2023

@lucasbru -- I can play around and see if adding to WrappedStore would make sense instead of introducing a new interface. Not sure without playing with the code (not sure when I will get to it though.

@ableegoldman -- I tend to agree to some extend... But it would be a major refactoring -- I don't think this PR would be too messy though. It's (like always) a trade-off/judgment-call if it's worth to just fix the bug or not. Atm, given I won't have time for a refactoring, I tend to be pragmatic and just fix the bug.

@guozhangwang -- yes, still looking for input on this. Don't need a full review, just some input if we think it's the right approach to begin with (cf Sophie's justified comment, that is will make it a little bit more "messy")

@guozhangwang
Copy link
Contributor

@guozhangwang -- yes, still looking for input on this. Don't need a full review, just some input if we think it's the right approach to begin with (cf Sophie's justified comment, that is will make it a little bit more "messy")

Ack, will take a look asap.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax I made a pass on the PR. For this specific issue of serde in FK-join, I tend to agree this would be a great fix, I just left a meta question regarding why the code used the value getter originally (and hence make your effort a lot harder) than passing in its store, like other KTable processor e.g. KTableAggregate does.

As for the store hierarchy, what I was originally thinking was summarized here: https://issues.apache.org/jira/browse/KAFKA-13286. Of course that was pretty out-dated and before versioned table and KeyValueStoreWrapper was introduced. So I'm trying to figure out if we can just align this PR with whatever our final goal of state store / serde refactoring try to achieve. I spent some time on it a bit and at the end I think having a byte store would be preferred:

  1. We'd probably need at least two types of underlying bytes store: key-value, and list (already added for stream-stream join).
  2. On top of bytes stores, we can have a layer of wrapped store where logging / caching happens; ALSO, this is where the remote store may be chimed in to replace the local store
  3. On top of the wrapped store, we can have a metrics store where serde happens (and unlike what we had today, serde will only happen at the metrics store, i.e. we only do serde once), that also means even if remote store is used, metrics and serde still happens on KS layer.

In some cases like this one then, we can simply have the upper metrics store layer with bytes serde (i.e. no serde at all) which would effectively be a bytes store with caching / logging configured. In we agree on that, then this PR may be refactored with the byte store itself, also as I mentioned inside the comment ResponseJoinProcessor can access the store directly since the store is its own.

WDYT @mjsax @ableegoldman ?

private final Supplier<String> valueHashSerdePseudoTopicSupplier;
private final ValueJoiner<V, VO, VR> joiner;
private final boolean leftJoin;

public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not pay attention to the code here, but now that I'm reading it, I wonder why we have to pass in a value-getter-supplier here instead of directly passing in the store itself and use the store.get APIs? The KTableValueGetter is (or at least, was) designed for a KTable object to be used by another KStream / KTable's processor as a parameter, like in a join in order to get its values while not directly accessing the underlying stores. For this case though, it is actually referencing the KTable itself, not the other "foreignKeyTable". If we just pass in the store itself would that make a lot of these wrapping gone and hence make the code simpler?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not write this code -- not 100% sure why it was done this way. Maybe it was a case of c&p of existing patterns. Your point seems to be valid though, that we don't need the getter indirection for the "subscription" store.

Might be a good think to split out a refactoring for this part into a single PR.

@CharlyRien
Copy link

Hello.
Any news on this issue? @mjsax

@alikelleci
Copy link

This issue realy makes it impossible to use kafka streams to make our materialized views in an event sourced system. Adding a minimal change like a primitve type in our view model will break the join. I am realy hoping this PR will solve the issue and look forward for updates.

@CharlyRien
Copy link

I know that I probably sound pushy, but I’m curious if there’s anything we can do to continue the work you initiated, @mjsax.
I understand you don't have a lot of time and other priorities and that's perfectly fine.
However, on our side, we have a costly KStream deployment process (blue/green deployment to avoid disrupting the live flow of events) and we are forced to use it each time we add a new field because of this problem. (even though our topology has not changed at all between versions.)

Thank you nonetheless for your time! 😊

@mjsax
Copy link
Member Author

mjsax commented Mar 1, 2024

Thanks for the ping -- yes, it's a struggle to keep up with all the different parallel things... It's good that you keep pushing on this; it helps to keep it's priority high...

If you see the discussions, it seems to be a rather larger change we want to do, that does not help to speed things up... :(

Not sure if you would we interested to help yourself and do a PR? It might still be slow on getting reviews, but maybe still faster than what we have right now. Maybe you could try to do this refactoring: #14157 (comment) -- Of course, this is all very deep inside KS runtime and requires quite some knowledge, so not sure if you would be willing to ramp up on all this...

@Chuckame
Copy link

Chuckame commented Jun 1, 2024

Hello @mjsax, I'm working on it (or trying to 😄). What do you think about adding getRaw to the KeyValueStore interface ? It would generalize an efficient access from anywhere.

The problem I see with having the rawView is that it will only target KTable.

We can also use the WrappedStore, but this will require casting so it feels not that clean.

@Chuckame
Copy link

Chuckame commented Jun 4, 2024

After deep diving into the KeyValueStore and KTableValueGetter implementation layers, there is still one blocker that prevents the full fix:

All the KTableValueGetter implementations that uses runtime mappers (based on the deserialized data) are not materialized so we don't have any backing store. It needs to serialize the mapped data on-the-fly to allow hashing the raw data, finally preventing the use of original raw data for hashing:

  • KTableMap[Values]ValueGetter
  • KTableFilterValueGetter
  • KTableTransformValuesGetter
  • KTableKTableAbstractJoins (for ktable-ktable joins on the same key)

In other words, if just before the foreign-key join we make an operation like join/leftJoin (same key), map, mapValues, filter or transformValues, there is no previous raw data as it is computed on-the-fly so it has no backing store.

Even if we revamp totally the raw store layer as @guozhangwang suggested, we will still have the same issue.

Idea

We could generate the hash on the original raw data before mapping/transforming, but this would be a breaking change as the hash will be different if a user is upgrading kafka-streams to this version (previously the hash were computed from the mapped value).

This change would need a new version for SubscriptionResponseWrapper (currently v0).

Pros:

  • We now have access to the original raw to bypass the deserialization step
  • We gain in performances as we do not deserialize -> transform -> serialize -> hash but just hash

Cons:

  • Need a v1 for SubscriptionResponseWrapper to allow the hashing source breaking change (if keeping actual version, users need to empty the stores, or all the events triggered by the right side will be skipped as the hash will be always different — current bug actually)

Would you allow this breaking change @mjsax ?

@alikelleci
Copy link

We recently had a production incident that was related to this issue. We just cannot make schema changes to our view models, which is a big problem. Therefore we decided not to use Kafka Streams to build our materialized views anymore. We still hope that this PR gets high priority and will fix the join issue so we can make use of Kafka Streams again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants