Skip to content

Commit

Permalink
Auto update the client to handle changes in number of partitions (#221)
Browse files Browse the repository at this point in the history
* Auto update the client to handle changes in number of partitions

* Fixed linter stuff

* Fixed locking of producer/consumer list when updating partitions

* Keep the lock during the partition update operation

* Removed empty line

* Fixed locking in producer.send()
  • Loading branch information
merlimat committed Apr 14, 2020
1 parent 6e5c7d3 commit a575681
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 52 deletions.
130 changes: 100 additions & 30 deletions pulsar/consumer_impl.go
Expand Up @@ -41,9 +41,12 @@ type acker interface {
}

type consumer struct {
client *client
options ConsumerOptions
consumers []*partitionConsumer
sync.Mutex
topic string
client *client
options ConsumerOptions
consumers []*partitionConsumer
consumerName string

// channel used to deliver message to clients
messageCh chan ConsumerMessage
Expand All @@ -52,6 +55,7 @@ type consumer struct {
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
ticker *time.Ticker

log *log.Entry
}
Expand Down Expand Up @@ -118,9 +122,11 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
}

func internalTopicSubscribe(client *client, options ConsumerOptions, topic string,
func newInternalConsumer(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlq *dlqRouter) (*consumer, error) {

consumer := &consumer{
topic: topic,
client: client,
options: options,
messageCh: messageCh,
Expand All @@ -130,56 +136,108 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
log: log.WithField("topic", topic),
}

partitions, err := client.TopicPartitions(topic)
if options.Name != "" {
consumer.consumerName = options.Name
} else {
consumer.consumerName = generateRandomName()
}

err := consumer.internalTopicSubscribeToPartitions()
if err != nil {
return nil, err
}

numPartitions := len(partitions)
consumer.consumers = make([]*partitionConsumer, numPartitions)
// set up timer to monitor for new partitions being added
duration := options.AutoDiscoveryPeriod
if duration <= 0 {
duration = defaultAutoDiscoveryDuration
}
consumer.ticker = time.NewTicker(duration)

go func() {
for range consumer.ticker.C {
consumer.log.Debug("Auto discovering new partitions")
consumer.internalTopicSubscribeToPartitions()
}
}()

return consumer, nil
}

func (c *consumer) internalTopicSubscribeToPartitions() error {
partitions, err := c.client.TopicPartitions(c.topic)
if err != nil {
return err
}

oldNumPartitions := 0
newNumPartitions := len(partitions)

c.Lock()
defer c.Unlock()
oldConsumers := c.consumers

if oldConsumers != nil {
oldNumPartitions = len(oldConsumers)
if oldNumPartitions == newNumPartitions {
c.log.Debug("Number of partitions in topic has not changed")
return nil
}

c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
}

c.consumers = make([]*partitionConsumer, newNumPartitions)

// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
c.consumers[i] = oldConsumers[i]
}

type ConsumerError struct {
err error
partition int
consumer *partitionConsumer
}

consumerName := options.Name
if consumerName == "" {
consumerName = generateRandomName()
}
receiverQueueSize := c.options.ReceiverQueueSize
metadata := c.options.Properties

receiverQueueSize := options.ReceiverQueueSize
metadata := options.Properties
partitionsToAdd := newNumPartitions - oldNumPartitions
var wg sync.WaitGroup
ch := make(chan ConsumerError, numPartitions)
wg.Add(numPartitions)
for partitionIdx, partitionTopic := range partitions {
ch := make(chan ConsumerError, partitionsToAdd)
wg.Add(partitionsToAdd)

for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
partitionTopic := partitions[partitionIdx]

go func(idx int, pt string) {
defer wg.Done()

var nackRedeliveryDelay time.Duration
if options.NackRedeliveryDelay == 0 {
if c.options.NackRedeliveryDelay == 0 {
nackRedeliveryDelay = defaultNackRedeliveryDelay
} else {
nackRedeliveryDelay = options.NackRedeliveryDelay
nackRedeliveryDelay = c.options.NackRedeliveryDelay
}
opts := &partitionConsumerOpts{
topic: pt,
consumerName: consumerName,
subscription: options.SubscriptionName,
subscriptionType: options.Type,
subscriptionInitPos: options.SubscriptionInitialPosition,
consumerName: c.consumerName,
subscription: c.options.SubscriptionName,
subscriptionType: c.options.Type,
subscriptionInitPos: c.options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
metadata: metadata,
replicateSubscriptionState: options.ReplicateSubscriptionState,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: nil,
subscriptionMode: durable,
readCompacted: options.ReadCompacted,
readCompacted: c.options.ReadCompacted,
}
cons, err := newPartitionConsumer(consumer, client, opts, messageCh, dlq)
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq)
ch <- ConsumerError{
err: err,
partition: idx,
Expand All @@ -197,34 +255,37 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin
if ce.err != nil {
err = ce.err
} else {
consumer.consumers[ce.partition] = ce.consumer
c.consumers[ce.partition] = ce.consumer
}
}

if err != nil {
// Since there were some failures,
// cleanup all the partitions that succeeded in creating the consumer
for _, c := range consumer.consumers {
for _, c := range c.consumers {
if c != nil {
c.Close()
}
}
return nil, err
return err
}

return consumer, nil
return nil
}

func topicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) {
return internalTopicSubscribe(client, options, topic, messageCh, dlqRouter)
return newInternalConsumer(client, options, topic, messageCh, dlqRouter)
}

func (c *consumer) Subscription() string {
return c.options.SubscriptionName
}

func (c *consumer) Unsubscribe() error {
c.Lock()
defer c.Unlock()

var errMsg string
for _, consumer := range c.consumers {
if err := consumer.Unsubscribe(); err != nil {
Expand Down Expand Up @@ -298,6 +359,9 @@ func (c *consumer) NackID(msgID MessageID) {

func (c *consumer) Close() {
c.closeOnce.Do(func() {
c.Lock()
defer c.Unlock()

var wg sync.WaitGroup
for i := range c.consumers {
wg.Add(1)
Expand All @@ -308,12 +372,16 @@ func (c *consumer) Close() {
}
wg.Wait()
close(c.closeCh)
c.ticker.Stop()
c.client.handlers.Del(c)
c.dlq.close()
})
}

func (c *consumer) Seek(msgID MessageID) error {
c.Lock()
defer c.Unlock()

if len(c.consumers) > 1 {
return errors.New("for partition topic, seek command should perform on the individual partitions")
}
Expand All @@ -327,6 +395,8 @@ func (c *consumer) Seek(msgID MessageID) error {
}

func (c *consumer) SeekByTime(time time.Time) error {
c.Lock()
defer c.Unlock()
if len(c.consumers) > 1 {
return errors.New("for partition topic, seek command should perform on the individual partitions")
}
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer_regex.go
Expand Up @@ -237,7 +237,7 @@ func (c *regexConsumer) monitor() {
case <-c.closeCh:
return
case <-c.ticker.C:
log.Debug("Auto discovering topics")
c.log.Debug("Auto discovering topics")
if !c.closed() {
c.discover()
}
Expand Down Expand Up @@ -361,7 +361,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum
for _, t := range topics {
go func(topic string) {
defer wg.Done()
c, err := internalTopicSubscribe(c, opts, topic, ch, dlq)
c, err := newInternalConsumer(c, opts, topic, ch, dlq)
consumerErrorCh <- consumerError{
err: err,
topic: topic,
Expand Down
71 changes: 69 additions & 2 deletions pulsar/consumer_test.go
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"fmt"
"log"
"net/http"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -326,7 +328,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
topic := "persistent://public/default/testGetPartitions"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"

makeHTTPCall(t, testURL, "64")
makeHTTPCall(t, http.MethodPut, testURL, "64")

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Expand Down Expand Up @@ -410,7 +412,7 @@ func TestConsumerShared(t *testing.T) {
topic := "persistent://public/default/testMultiPartitionConsumerShared"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions"

makeHTTPCall(t, testURL, "3")
makeHTTPCall(t, http.MethodPut, testURL, "3")

sub := "sub-shared-1"
consumer1, err := client.Subscribe(ConsumerOptions{
Expand Down Expand Up @@ -1207,3 +1209,68 @@ func TestGetDeliveryCount(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, uint32(3), msg.RedeliveryCount())
}

func TestConsumerAddTopicPartitions(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"
makeHTTPCall(t, http.MethodPut, testURL, "3")

// create producer
partitionsAutoDiscoveryInterval = 100 * time.Millisecond
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
MessageRouter: func(msg *ProducerMessage, topicMetadata TopicMetadata) int {
// The message key will contain the partition id where to route
i, err := strconv.Atoi(msg.Key)
assert.NoError(t, err)
return i
},
})
assert.Nil(t, err)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
AutoDiscoveryPeriod: 100 * time.Millisecond,
})
assert.Nil(t, err)
defer consumer.Close()

// Increase number of partitions to 10
makeHTTPCall(t, http.MethodPost, testURL, "10")

// Wait for the producer/consumers to pick up the change
time.Sleep(1 * time.Second)

// Publish messages ensuring that they will go to all the partitions
ctx := context.Background()
for i := 0; i < 10; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Key: fmt.Sprintf("%d", i),
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}

msgs := make([]string, 0)

for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
msgs = append(msgs, string(msg.Payload()))

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))

consumer.Ack(msg)
}

assert.Equal(t, len(msgs), 10)
}

0 comments on commit a575681

Please sign in to comment.