Initial checkin of PublishMessage/Publish#9515
Initial checkin of PublishMessage/Publish#9515flowchartsman wants to merge 1 commit intoapache:masterfrom
Conversation
|
This is one possible way this could be implemented, sticking to the standards of Go Pulsar functions by sending the message async and not returning error/messageID to the user. The other possible route would be like my original suggestion, and then all sends to topics that are not the output topic would be synchronous. FEEDBACK REQUESTED |
| producer.SendAsync(ctx, &message, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) { | ||
| if err != nil { | ||
| goInstance.stats.incrTotalSysExceptions(err) | ||
| log.Fatal(err) |
There was a problem hiding this comment.
Do we want to exit the process here or instead return an error to allow users to handle delivering failures?
|
|
||
| producer := fctx.NewOutputMessage(publishTopic) | ||
| msgID, err := producer.Send(ctx, &pulsar.ProducerMessage{ | ||
| log.Printf("publishing to additional topic %s", publishTopic) |
There was a problem hiding this comment.
is this for debugging purpose and will be removed when checked in?
There was a problem hiding this comment.
This is just the example, so it's only there to show that it's doing something. I wasn't sure what to put here. Recommendations?
| Publish func(topic string, payload []byte) | ||
| PublishMessage func(topic string, message pulsar.ProducerMessage) |
There was a problem hiding this comment.
Can you give little more control to the caller with a context and return an error?
- Something like func(ctx Context, topic string, payload []byte), since you are creating and passing a context to the lower level anyway.
- Return an error instead of log.Fatal crash the process.
There was a problem hiding this comment.
I originally wrote it that way, but returning an error is not how the rest of the framework works. If there's an error, it dies, relying on metrics/backpressure to alert and remediate. Why should this be any different for multiple output topics? If you look at the conceptual documents for multi-topic dispatch (with python examples), they are not treated any differently as a unit. To me, this suggests they should be treated (and automatically tracked with metrics) the same way, otherwise the user should just create a regular consumer/producer with the regular golang SDK, not a pulsar function.
| // topic for output message | ||
| // topic for output message (DEPRECATED) | ||
| func (c *FunctionContext) NewOutputMessage(topicName string) pulsar.Producer { | ||
| log.Warn("NewOutputMessage is deprecated, please use Publish() or PublishMsg()") |
There was a problem hiding this comment.
We should add a new function like NewProducer(topicName string) (pulsar.Producer, error) or with any proper name. the reason is we need to remain this function exists, and let user know with the log.Warn if they still want this.
There was a problem hiding this comment.
Normally I would agree, but as I've discussed elsewhere, the pulsar function SDK API is relatively lightweight by design. The more knobs you give the user to tweak at runtime, the further you get from the control of the process the SDK appears to be designed for. At that point, it is not much different from a regular consumer/producer with deployment, some configuration and some logging taken care of.
| Publish func(topic string, payload []byte) | ||
| PublishMessage func(topic string, message pulsar.ProducerMessage) |
|
The pr had no activity for 30 days, mark with Stale label. |
|
@flowchartsman:Thanks for your contribution. For this PR, do we need to update docs? |
|
Closed as the development of the Go client has been permanently moved to https://github.com/apache/pulsar-client-go. |
|
Oops. It's about the Go Functions, not the Go client. But still, there're several conflicts so please feel free to resubmit the patch. |
fixes #9512
Motivation
#9512
Modifications
similar to #9512, but errors are fatal, as with normal message handling. (FEEDBACK REQUESTED))
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
Documentation