[Issue 242][pulsar-client-go] feature: add send timeout #249
[Issue 242][pulsar-client-go] feature: add send timeout #249lifepuzzlefun wants to merge 7 commits intoapache:masterfrom
Conversation
Changes 1. ProduceOptions add SendTimeout and SendTimeoutCheckInterval in ProducerOptions 2. BlockingQueue unexported BlockingQueue.Iterator and add methods to avoid race condition. BlockingQueue.IterateIfNonEmpty and BlockingQueue.PollIfSatisfy 3. partitionProducer add a ticker to check if pendingItem in pendingQueue has expired. Some explain on implementation: 1. if message is send as batch, will only check if the first message timeout. if first message in batch timeout. will call all message send callback with an ErrSendTimeout. 2. avoid use context to judge if send has been timeout. maybe one set different timeout on each message. this will introduce a lot complicity. and context is use for goroutine control. i think this is not suit for send timeout use case, because there is no goroutine at all.
Changes 1. ProduceOptions add SendTimeout and SendTimeoutCheckInterval in ProducerOptions 2. BlockingQueue unexported BlockingQueue.Iterator and add methods to avoid race condition. BlockingQueue.IterateIfNonEmpty and BlockingQueue.PollIfSatisfy 3. partitionProducer add a ticker to check if pendingItem in pendingQueue has expired. Some explain on implementation: 1. if message is send as batch, will only check if the first message timeout. if first message in batch timeout. will call all message send callback with an ErrSendTimeout. 2. avoid use context to judge if send has been timeout. maybe one set different timeout on each message. this will introduce a lot complicity. and context is use for goroutine control. i think this is not suit for send timeout use case, because there is no goroutine at all.
# Conflicts: # pulsar/producer.go # pulsar/producer_partition.go
# Conflicts: # pulsar/producer.go # pulsar/producer_partition.go
|
The |
|
I think the correct fix for #242 would be to also check the context before attempting to publish the message. |
|
yeah, i think it should be, but when user pass different context with different sendtimeout to Send and SendAsync , this will introduce a lot of complicity. the message is send as batch. and each message in batch with different timeout |
|
i'm confused about the above use case, need some guide on the real popuse of passing an context here. and where do you prefer to check the context? just before send or both before send and time |
|
I hope that both before send and time wait for response should check timeout.I don't want to wait send func always,because I meet this situation where pulsar broker has something wrong and send func always wait @ |
|
closed and goto #252 |
Fixes #242
Motivation
add send timeout
Modifications
ProduceOptions
add SendTimeout and SendTimeoutCheckInterval in ProducerOptions
BlockingQueue
unexported BlockingQueue.Iterator
and add methods to avoid race condition.
BlockingQueue.IterateIfNonEmpty and BlockingQueue.PollIfSatisfy
partitionProducer
add a ticker to check if pendingItem in pendingQueue has expired.
Verifying this change
This change added tests and can be verified as follows:
add some tiny test in producer_test.go
Does this pull request potentially affect one of the following parts:
If
yeswas chosen, please highlight the changesDocumentation