-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-client] Fix: set and return topic name on message api #11743
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
@@ -284,7 +284,7 @@ public MessageMetadata getMetadataBuilder() { | |||
|
|||
public Message<T> getMessage() { | |||
beforeSend(); | |||
return MessageImpl.create(msgMetadata, content, schema); | |||
return MessageImpl.create(msgMetadata, content, schema, producer.topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use producer.topic
instead of producer.getTopic()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Demogorgon314 I would use producer.getTopic() in order to ease mocking.
also the producer field used to be nullable
this change is the cause of streamnative/kop#677
I am not sure that the KOP usage is correct, as it is using an internal API to build a message
if we change to producer.getTopic(), will you be able to fix the KOP issue ?
isn't there any other way of creating a Message without using the Builder ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change to producer.getTopic()
, we are still unable fix the KoP issue, maybe we shouldn't use the TypedMessageBuilderImpl
in KoP, we can construct MessageImpl
manually or we can determine whether producer
is null or not in TypedMessageBuilderImpl#getMessage()
.
adding a null check for "producer" here smells a little, because this (the internals of TypedMessageBuilderImpl) is not a Public API, it is internal and here "producer" is always not null. if we need a MessageImpl then we need a public API to create an instance of a Message without using a Producer |
@@ -78,13 +78,14 @@ | |||
private boolean poolMessage; | |||
|
|||
// Constructor for out-going message | |||
public static <T> MessageImpl<T> create(MessageMetadata msgMetadata, ByteBuffer payload, Schema<T> schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not modify this method, maybe it is better to add a new one, and MoP depends on this method.
https://github.com/streamnative/mop/blob/master/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/utils/PulsarMessageConverter.java#L66
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageImpl
is implementation class and it should not be used as a contract.
Yes, |
I agree with @rdhabalia @Demogorgon314 do you want to send a PR for the new MessageBuilder API ? |
Agree. I'll send a PR, but we need write a small PIP first. |
Motivation
In an application that has multiple topics and shares a single MessageRouter across all the topics then it's not possible right now, to find out the topic name in message-router when
MessageRouter::choosePartition
receives callback and application which requires topic-name to derive partition-number can not work.Modification
Set and return Topic-name in
Message::getTopicName()
API.Result
The application that requires topic name while selecting partition in MessageRouter, will be able to successfully get topic name and return the correct partition based on it,