reader subscribe all partitioned topics#705
reader subscribe all partitioned topics#705GPrabhudas wants to merge 4 commits intoapache:masterfrom
Conversation
|
cc @cckellogg PTAL |
…nto reader-subscribe-all-partitions
| // Name returns the name of consumer. | ||
| Name() string | ||
|
|
||
| // lastDequeuedMsg used for setting last dequeued msg id by internal partition consumers |
There was a problem hiding this comment.
I don't think we should be adding private methods to the public interface. I think these should be removed. Also, it seems these are only relevant to the reader implementation so I think these helper functions should live within the reader files.
| NackBackoffPolicy NackBackoffPolicy | ||
|
|
||
| // startMessageID internally used by multitopic-reader | ||
| startMessageID MessageID |
There was a problem hiding this comment.
The ConsumerOptions is a public interface so I don't think we should add package private fields here. Let's remove these.
| } | ||
|
|
||
| func (pc *partitionConsumer) messagesInQueue() int { | ||
| return len(pc.queueCh) |
There was a problem hiding this comment.
If there are multi go routines consuming and producing to this channel this value might not be useful after being read.
| c.handlers.Add(reader) | ||
| return reader, nil | ||
| if len(topics) <= 1 { | ||
| reader, err := newReader(c, options) |
There was a problem hiding this comment.
What's the different between a reader and a multi reader? It seems like they should be the same but just consume a different number of partitions?
Initial changes to address reader topic not found issue: #553
Changes: