Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto update the client to handle changes in number of partitions #221

Merged
merged 6 commits into from Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
134 changes: 104 additions & 30 deletions pulsar/consumer_impl.go
Expand Up @@ -41,9 +41,12 @@ type acker interface {
}

type consumer struct {
client *client
options ConsumerOptions
consumers []*partitionConsumer
sync.Mutex
topic string
client *client
options ConsumerOptions
consumers []*partitionConsumer
consumerName string

// channel used to deliver message to clients
messageCh chan ConsumerMessage
Expand All @@ -52,6 +55,7 @@ type consumer struct {
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
ticker *time.Ticker

log *log.Entry
}
Expand Down Expand Up @@ -118,9 +122,11 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
}

func internalTopicSubscribe(client *client, options ConsumerOptions, topic string,
func newInternalConsumer(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlq *dlqRouter) (*consumer, error) {

consumer := &consumer{
topic: topic,
client: client,
options: options,
messageCh: messageCh,
Expand All @@ -130,56 +136,108 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
log: log.WithField("topic", topic),
}

partitions, err := client.TopicPartitions(topic)
if options.Name != "" {
consumer.consumerName = options.Name
} else {
consumer.consumerName = generateRandomName()
}

err := consumer.internalTopicSubscribeToPartitions()
if err != nil {
return nil, err
}

numPartitions := len(partitions)
consumer.consumers = make([]*partitionConsumer, numPartitions)
// set up timer to monitor for new partitions being added
duration := options.AutoDiscoveryPeriod
if duration <= 0 {
duration = defaultAutoDiscoveryDuration
}
consumer.ticker = time.NewTicker(duration)

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only start this if the topic is partitioned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem is that the TopicPartitions() will just return a list, so we don't necessarely know if it's 1 partition or no partitions. In any case, if the topic is not partitioned, it would not be possible to create a partitioned topic with same name.

for range consumer.ticker.C {
consumer.log.Debug("Auto discovering new partitions")
consumer.internalTopicSubscribeToPartitions()
}
}()

return consumer, nil
}

func (c *consumer) internalTopicSubscribeToPartitions() error {
merlimat marked this conversation as resolved.
Show resolved Hide resolved
partitions, err := c.client.TopicPartitions(c.topic)
if err != nil {
return err
}

oldNumPartitions := 0
newNumPartitions := len(partitions)

c.Lock()
oldConsumers := c.consumers
c.Unlock()

if oldConsumers != nil {
oldNumPartitions = len(oldConsumers)
if oldNumPartitions == newNumPartitions {
c.log.Debug("Number of partitions in topic has not changed")
return nil
}

c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
}

consumers := make([]*partitionConsumer, newNumPartitions)

// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
consumers[i] = oldConsumers[i]
}

type ConsumerError struct {
err error
partition int
consumer *partitionConsumer
}

consumerName := options.Name
if consumerName == "" {
consumerName = generateRandomName()
}
receiverQueueSize := c.options.ReceiverQueueSize
metadata := c.options.Properties

receiverQueueSize := options.ReceiverQueueSize
metadata := options.Properties
partitionsToAdd := newNumPartitions - oldNumPartitions
var wg sync.WaitGroup
ch := make(chan ConsumerError, numPartitions)
wg.Add(numPartitions)
for partitionIdx, partitionTopic := range partitions {
ch := make(chan ConsumerError, partitionsToAdd)
wg.Add(partitionsToAdd)

for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
partitionTopic := partitions[partitionIdx]

go func(idx int, pt string) {
defer wg.Done()

var nackRedeliveryDelay time.Duration
if options.NackRedeliveryDelay == 0 {
if c.options.NackRedeliveryDelay == 0 {
nackRedeliveryDelay = defaultNackRedeliveryDelay
} else {
nackRedeliveryDelay = options.NackRedeliveryDelay
nackRedeliveryDelay = c.options.NackRedeliveryDelay
}
opts := &partitionConsumerOpts{
topic: pt,
consumerName: consumerName,
subscription: options.SubscriptionName,
subscriptionType: options.Type,
subscriptionInitPos: options.SubscriptionInitialPosition,
consumerName: c.consumerName,
subscription: c.options.SubscriptionName,
subscriptionType: c.options.Type,
subscriptionInitPos: c.options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
metadata: metadata,
replicateSubscriptionState: options.ReplicateSubscriptionState,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: nil,
subscriptionMode: durable,
readCompacted: options.ReadCompacted,
readCompacted: c.options.ReadCompacted,
}
cons, err := newPartitionConsumer(consumer, client, opts, messageCh, dlq)
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq)
ch <- ConsumerError{
err: err,
partition: idx,
Expand All @@ -197,34 +255,41 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
if ce.err != nil {
err = ce.err
} else {
consumer.consumers[ce.partition] = ce.consumer
consumers[ce.partition] = ce.consumer
}
}

if err != nil {
// Since there were some failures,
// cleanup all the partitions that succeeded in creating the consumer
for _, c := range consumer.consumers {
for _, c := range consumers {
if c != nil {
c.Close()
}
}
return nil, err
return err
}

return consumer, nil
c.Lock()
c.consumers = consumers
c.Unlock()

return nil
}

func topicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) {
return internalTopicSubscribe(client, options, topic, messageCh, dlqRouter)
return newInternalConsumer(client, options, topic, messageCh, dlqRouter)
}

func (c *consumer) Subscription() string {
return c.options.SubscriptionName
}

func (c *consumer) Unsubscribe() error {
c.Lock()
defer c.Unlock()

var errMsg string
for _, consumer := range c.consumers {
if err := consumer.Unsubscribe(); err != nil {
Expand Down Expand Up @@ -298,6 +363,9 @@ func (c *consumer) NackID(msgID MessageID) {

func (c *consumer) Close() {
c.closeOnce.Do(func() {
c.Lock()
defer c.Unlock()

var wg sync.WaitGroup
for i := range c.consumers {
wg.Add(1)
Expand All @@ -308,12 +376,16 @@ func (c *consumer) Close() {
}
wg.Wait()
close(c.closeCh)
c.ticker.Stop()
c.client.handlers.Del(c)
c.dlq.close()
})
}

func (c *consumer) Seek(msgID MessageID) error {
c.Lock()
defer c.Unlock()

if len(c.consumers) > 1 {
return errors.New("for partition topic, seek command should perform on the individual partitions")
}
Expand All @@ -327,6 +399,8 @@ func (c *consumer) Seek(msgID MessageID) error {
}

func (c *consumer) SeekByTime(time time.Time) error {
c.Lock()
defer c.Unlock()
if len(c.consumers) > 1 {
return errors.New("for partition topic, seek command should perform on the individual partitions")
}
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer_regex.go
Expand Up @@ -237,7 +237,7 @@ func (c *regexConsumer) monitor() {
case <-c.closeCh:
return
case <-c.ticker.C:
log.Debug("Auto discovering topics")
c.log.Debug("Auto discovering topics")
if !c.closed() {
c.discover()
}
Expand Down Expand Up @@ -361,7 +361,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum
for _, t := range topics {
go func(topic string) {
defer wg.Done()
c, err := internalTopicSubscribe(c, opts, topic, ch, dlq)
c, err := newInternalConsumer(c, opts, topic, ch, dlq)
consumerErrorCh <- consumerError{
err: err,
topic: topic,
Expand Down
71 changes: 69 additions & 2 deletions pulsar/consumer_test.go
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"fmt"
"log"
"net/http"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -326,7 +328,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
topic := "persistent://public/default/testGetPartitions"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"

makeHTTPCall(t, testURL, "64")
makeHTTPCall(t, http.MethodPut, testURL, "64")

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Expand Down Expand Up @@ -410,7 +412,7 @@ func TestConsumerShared(t *testing.T) {
topic := "persistent://public/default/testMultiPartitionConsumerShared"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions"

makeHTTPCall(t, testURL, "3")
makeHTTPCall(t, http.MethodPut, testURL, "3")

sub := "sub-shared-1"
consumer1, err := client.Subscribe(ConsumerOptions{
Expand Down Expand Up @@ -1207,3 +1209,68 @@ func TestGetDeliveryCount(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, uint32(3), msg.RedeliveryCount())
}

func TestConsumerAddTopicPartitions(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "3")

// create producer
partitionsAutoDiscoveryInterval = 100 * time.Millisecond
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
MessageRouter: func(msg *ProducerMessage, topicMetadata TopicMetadata) int {
// The message key will contain the partition id where to route
i, err := strconv.Atoi(msg.Key)
assert.NoError(t, err)
return i
},
})
assert.Nil(t, err)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
AutoDiscoveryPeriod: 100 * time.Millisecond,
})
assert.Nil(t, err)
defer consumer.Close()

// Increase number of partitions to 10
makeHTTPCall(t, http.MethodPost, testURL, "10")

// Wait for the producer/consumers to pick up the change
time.Sleep(1 * time.Second)

// Publish messages ensuring that they will go to all the partitions
ctx := context.Background()
for i := 0; i < 10; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Key: fmt.Sprintf("%d", i),
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}

msgs := make([]string, 0)

for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
msgs = append(msgs, string(msg.Payload()))

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))

consumer.Ack(msg)
}

assert.Equal(t, len(msgs), 10)
}