Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make StreamMessage generic and a bug fix #9544

Merged
merged 1 commit into from Oct 9, 2022

Conversation

vvivekiyer
Copy link
Contributor

@vvivekiyer vvivekiyer commented Oct 6, 2022

With PR #9224, StreamMessage can only accept bytes[] datatype for values. However, MessageBatch interface is generic and lets users implement a class with a custom type. For example, LinkedIn uses MessageBatch<IndexedRecord> for it's kafka client. Keeping StreamMessage generic will avoid unnecessary serializing and deserializing for such users.

Also fixed a NPE in StreamDataDecoderImpl.java where metadata header is null.

Added tests to StreamMessageTest.java. Also tested the changes on a cluster.

@vvivekiyer vvivekiyer marked this pull request as ready for review October 6, 2022 19:15
@vvivekiyer
Copy link
Contributor Author

@sajjad-moradi and @mcvsubbu please review.

@vvivekiyer vvivekiyer changed the title Make StreamMessage generic Make StreamMessage generic and a bug fix Oct 6, 2022
@vvivekiyer
Copy link
Contributor Author

Tagging @navina to take a look as well.

_key = key;
_value = value;
_metadata = metadata;
_length = length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please update the javadoc for the new field?

@navina
Copy link
Contributor

navina commented Oct 7, 2022

@vvivekiyer I had intentionally tried to stay away from using Generics in this new interface as it make the code hard to maintain. Moreover, I wanted to make sure that the StreamConsumer contract did not include deserializing the incoming payload

LinkedIn uses MessageBatch for it's kafka client. Keeping StreamMessage generic will avoid unnecessary type conversion (which could involve serializing and deserializing) for such users.

I am curious why Linkedin does this? isn't is expensive to deserialize every record until when the deserialized payload is actually needed? The consumer's contract should not involve deserializing the payload. Can you please explain why this is useful?
I also don't understand the point about "unnecessary type conversion" here. Can you please elaborate?

Using generics forces the segment manager implementation to deal with raw usage of parameterized classes (due to type erasures) and make the code hard to read and maintain.
Besides,StreamMessage is meant to abstract the entire incoming payload. Using only the type of the record's "value" in this generic class seems prohibitive.

Soliciting feedback from @npawar / @Jackie-Jiang / @kishoreg here.

Thanks!

Also fixed a NPE in StreamDataDecoderImpl.java where metadata header is null.

Thank you for fixing this! I recently noticed this in my testing.

@navina
Copy link
Contributor

navina commented Oct 7, 2022

Another question: If Linkedin is using MessageBatch<IndexedRecord>, does that mean Linkedin is also using its own implementation of kafka consumer and not the OSS plugin?

@vvivekiyer
Copy link
Contributor Author

vvivekiyer commented Oct 7, 2022

@navina I've tried to answer the questions below. Please take a look and let me know what you think.

If Linkedin is using MessageBatch<IndexedRecord>, does that mean Linkedin is also using its own implementation of kafka consumer and not the OSS plugin?

Yes. LinkedIn has a custom kafka consumer implementation.

I am curious why Linkedin does this? isn't is expensive to deserialize every record until when the deserialized payload is actually needed? The consumer's contract should not involve deserializing the payload. Can you please explain why this is useful?

LinkedIn's kafka consumer directly fetches the deserialized payload . AFAIK, Linkedin Kafka has a schema registry where the payload's schema is registered. So they provide (optimized) deser and do not allow clients to have their own deserialization. @sajjad-moradi to add more details, if any.

Just to give more clarity about linkedin's custom implementation of interfaces:

  • LiKafkaMessageBatch implements MessageBatch<IndexedRecord>
  • LiKafkaConsumer extends PartitionLevelConsumer
  • LiKafkaDecoder implements StreamMessageDecoder<IndexedRecord>

I also don't understand the point about "unnecessary type conversion" here. Can you please elaborate?

I meant additional serialization and deserialization. Edited the description.

Using generics forces the segment manager implementation to deal with raw usage of parameterized classes (due to type erasures) and make the code hard to read and maintain.

As per my understanding of the code, SegmentManager only deals with GenericRow once the deserialization is done. Depending on various implementations, MessageBatch<T> and StreamMessageDecoder<T> interfaces take care of abstracting the messages and decoding. Each implementation can deal with the formats they wish. So can you please clarify what you mean by making the SegmentManager code harder to read and maintain?

Besides, StreamMessage is meant to abstract the entire incoming payload. Using only the type of the record's "value" in this generic class seems prohibitive.

I agree with this part. We can discuss and arrive at the best way to do this. But IMO, forcing StreamMessage to have key and value of type bytes[] seems counter-intuitive based on the OSS code flow. I've tried to explain the issue below.

This is my understanding of our OSS code prior to #9224:

  1. Get a batch of messages:
    MessageBatch<T> messageBatch = PartitionLevelConsumer.fetchMessage();
  2. for (message in MessageBatch<T>), decode the message
    GenericRow row = StreamMessageDecoder<T>.decode(message)

Note that MessageBatch is a generic interface because users of Pinot are free to use their custom kafka (or other) client implementations that could return messages in any format.

After #9224, the code looks as follows:

  1. Get a batch of messages:
    MessageBatch<T> messageBatch = PartitionLevelConsumer.fetchMessage();
  2. Get each message and store it in a StreamMessage wrapper. Note that StreamMessage stores values only as byte[].
    byte[] StreamMessage.value = messageBatch.getMessageValue(index)
  3. Decode StreamMessage
    GenericRow row = StreamMessageDecoder<T>.decode(StreamMessage.value())

Looking at the above, it looks like we've introduced a new step (2), where we are forcing messages of generic type (MessageBatch) to be serialized to byte[] and go back to working on generic types again in StreamMessageDecoder.
As you mentioned, the new code assumes that when messages are read from the stream consumer, they will always be in serialized format. But the existing code for MessageBatch<T> and StreamMessageDecoder<T> doesn't honor the assumption. Note that the intermediate byte[] array just acts as a passthrough (is not used anywhere other than for decoding in StreamMessageDecoder) sandwiched between two interfaces that use generic types.

@vvivekiyer
Copy link
Contributor Author

@navina Just wanted to add that our build pipeline is broken currently. So we are looking to fix this as soon as possible.
Depending on your feedback for my above comments, we can discuss short-term and long-term fixes and take up the long-term fixes as TODOs.
cc: @mcvsubbu

@sajjad-moradi
Copy link
Contributor

I'm merging this PR as the issue it fixes has broken LinkedIn's build. As Vivek mentioned, we can discuss short term & long term fixes if needed.

@sajjad-moradi sajjad-moradi merged commit 0d69ae1 into apache:master Oct 9, 2022
@navina
Copy link
Contributor

navina commented Oct 10, 2022

@vvivekiyer Not sure how much value a discussion can offer if the PR has already been merged. But here is my take on this:

MessageBatch is a generic interface because users of Pinot are free to use their custom kafka (or other) client implementations that could return messages in any format.

I understand the flexibility that this generic MessageBatch provides. But we want to get a more stronger interface contract so that the development of a plugin becomes trivial and streamlined. Features we can add:

  • Filtering records based on message metadata (benefit: avoid payload de-serialization cost)
  • Compute time to deserialize a payload
  • Streamline handling of decode failures

As you mentioned, the new code assumes that when messages are read from the stream consumer, they will always be in serialized format. But the existing code for MessageBatch and StreamMessageDecoder doesn't honor the assumption.

Yes. that was the whole point of changing to the new code. I was trying to de-couple the "decoding" of a message from "fetching" of a message from the stream.

Screen Shot 2022-10-10 at 10 51 20 AM

Just to give more clarity about linkedin's custom implementation of interfaces:
LiKafkaMessageBatch implements MessageBatch
LiKafkaConsumer extends PartitionLevelConsumer
LiKafkaDecoder implements StreamMessageDecoder

Iirc, Linkedin has multiple client libraries and I am fairly certain that almost all of them, except linked-kafka-clients allow you to customize the serde/deserde in their clients. Moreover, Linkedin's avro decoder is also a separate library that can be used in your custom implementation as LiKafkaDecoder implements StreamMessageDecoder<byte[]>

Can you help me understand why this approach cannot be used by Linkedin pinot ?

@mayankshriv
Copy link
Contributor

@navina @vvivekiyer @sajjad-moradi after going through the history, I see that there is more discussion warranted here to ensure we have agreement on the right design forward. Given that the PR was already merged, and there seems to be open questions, could we get to consensus quickly? Would like to avoid a situation where the right design forward is not aligned with the merged PR, but the PR stays in the system long enough to make it harder to solve cleanly.

Please let me know how I can help.

@vvivekiyer
Copy link
Contributor Author

Taking this discussion to slack to get faster resolution. cc: @mayankshriv @navina @sajjad-moradi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants