diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 6a526a82ef..ba7d24d00c 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -41,9 +41,12 @@ type acker interface { } type consumer struct { - client *client - options ConsumerOptions - consumers []*partitionConsumer + sync.Mutex + topic string + client *client + options ConsumerOptions + consumers []*partitionConsumer + consumerName string // channel used to deliver message to clients messageCh chan ConsumerMessage @@ -52,6 +55,7 @@ type consumer struct { closeOnce sync.Once closeCh chan struct{} errorCh chan error + ticker *time.Ticker log *log.Entry } @@ -118,9 +122,11 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return nil, newError(ResultInvalidTopicName, "topic name is required for consumer") } -func internalTopicSubscribe(client *client, options ConsumerOptions, topic string, +func newInternalConsumer(client *client, options ConsumerOptions, topic string, messageCh chan ConsumerMessage, dlq *dlqRouter) (*consumer, error) { + consumer := &consumer{ + topic: topic, client: client, options: options, messageCh: messageCh, @@ -130,13 +136,65 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin log: log.WithField("topic", topic), } - partitions, err := client.TopicPartitions(topic) + if options.Name != "" { + consumer.consumerName = options.Name + } else { + consumer.consumerName = generateRandomName() + } + + err := consumer.internalTopicSubscribeToPartitions() if err != nil { return nil, err } - numPartitions := len(partitions) - consumer.consumers = make([]*partitionConsumer, numPartitions) + // set up timer to monitor for new partitions being added + duration := options.AutoDiscoveryPeriod + if duration <= 0 { + duration = defaultAutoDiscoveryDuration + } + consumer.ticker = time.NewTicker(duration) + + go func() { + for range consumer.ticker.C { + consumer.log.Debug("Auto discovering new partitions") + consumer.internalTopicSubscribeToPartitions() + } + }() + + return consumer, nil +} + +func (c *consumer) internalTopicSubscribeToPartitions() error { + partitions, err := c.client.TopicPartitions(c.topic) + if err != nil { + return err + } + + oldNumPartitions := 0 + newNumPartitions := len(partitions) + + c.Lock() + defer c.Unlock() + oldConsumers := c.consumers + + if oldConsumers != nil { + oldNumPartitions = len(oldConsumers) + if oldNumPartitions == newNumPartitions { + c.log.Debug("Number of partitions in topic has not changed") + return nil + } + + c.log.WithField("old_partitions", oldNumPartitions). + WithField("new_partitions", newNumPartitions). + Info("Changed number of partitions in topic") + } + + c.consumers = make([]*partitionConsumer, newNumPartitions) + + // Copy over the existing consumer instances + for i := 0; i < oldNumPartitions; i++ { + c.consumers[i] = oldConsumers[i] + } type ConsumerError struct { err error @@ -144,42 +202,42 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin consumer *partitionConsumer } - consumerName := options.Name - if consumerName == "" { - consumerName = generateRandomName() - } + receiverQueueSize := c.options.ReceiverQueueSize + metadata := c.options.Properties - receiverQueueSize := options.ReceiverQueueSize - metadata := options.Properties + partitionsToAdd := newNumPartitions - oldNumPartitions var wg sync.WaitGroup - ch := make(chan ConsumerError, numPartitions) - wg.Add(numPartitions) - for partitionIdx, partitionTopic := range partitions { + ch := make(chan ConsumerError, partitionsToAdd) + wg.Add(partitionsToAdd) + + for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ { + partitionTopic := partitions[partitionIdx] + go func(idx int, pt string) { defer wg.Done() var nackRedeliveryDelay time.Duration - if options.NackRedeliveryDelay == 0 { + if c.options.NackRedeliveryDelay == 0 { nackRedeliveryDelay = defaultNackRedeliveryDelay } else { - nackRedeliveryDelay = options.NackRedeliveryDelay + nackRedeliveryDelay = c.options.NackRedeliveryDelay } opts := &partitionConsumerOpts{ topic: pt, - consumerName: consumerName, - subscription: options.SubscriptionName, - subscriptionType: options.Type, - subscriptionInitPos: options.SubscriptionInitialPosition, + consumerName: c.consumerName, + subscription: c.options.SubscriptionName, + subscriptionType: c.options.Type, + subscriptionInitPos: c.options.SubscriptionInitialPosition, partitionIdx: idx, receiverQueueSize: receiverQueueSize, nackRedeliveryDelay: nackRedeliveryDelay, metadata: metadata, - replicateSubscriptionState: options.ReplicateSubscriptionState, + replicateSubscriptionState: c.options.ReplicateSubscriptionState, startMessageID: nil, subscriptionMode: durable, - readCompacted: options.ReadCompacted, + readCompacted: c.options.ReadCompacted, } - cons, err := newPartitionConsumer(consumer, client, opts, messageCh, dlq) + cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq) ch <- ConsumerError{ err: err, partition: idx, @@ -197,27 +255,27 @@ func internalTopicSubscribe(client *client, options ConsumerOptions, topic strin if ce.err != nil { err = ce.err } else { - consumer.consumers[ce.partition] = ce.consumer + c.consumers[ce.partition] = ce.consumer } } if err != nil { // Since there were some failures, // cleanup all the partitions that succeeded in creating the consumer - for _, c := range consumer.consumers { + for _, c := range c.consumers { if c != nil { c.Close() } } - return nil, err + return err } - return consumer, nil + return nil } func topicSubscribe(client *client, options ConsumerOptions, topic string, messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) { - return internalTopicSubscribe(client, options, topic, messageCh, dlqRouter) + return newInternalConsumer(client, options, topic, messageCh, dlqRouter) } func (c *consumer) Subscription() string { @@ -225,6 +283,9 @@ func (c *consumer) Subscription() string { } func (c *consumer) Unsubscribe() error { + c.Lock() + defer c.Unlock() + var errMsg string for _, consumer := range c.consumers { if err := consumer.Unsubscribe(); err != nil { @@ -298,6 +359,9 @@ func (c *consumer) NackID(msgID MessageID) { func (c *consumer) Close() { c.closeOnce.Do(func() { + c.Lock() + defer c.Unlock() + var wg sync.WaitGroup for i := range c.consumers { wg.Add(1) @@ -308,12 +372,16 @@ func (c *consumer) Close() { } wg.Wait() close(c.closeCh) + c.ticker.Stop() c.client.handlers.Del(c) c.dlq.close() }) } func (c *consumer) Seek(msgID MessageID) error { + c.Lock() + defer c.Unlock() + if len(c.consumers) > 1 { return errors.New("for partition topic, seek command should perform on the individual partitions") } @@ -327,6 +395,8 @@ func (c *consumer) Seek(msgID MessageID) error { } func (c *consumer) SeekByTime(time time.Time) error { + c.Lock() + defer c.Unlock() if len(c.consumers) > 1 { return errors.New("for partition topic, seek command should perform on the individual partitions") } diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 6610b55a56..00cbb88c3d 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -237,7 +237,7 @@ func (c *regexConsumer) monitor() { case <-c.closeCh: return case <-c.ticker.C: - log.Debug("Auto discovering topics") + c.log.Debug("Auto discovering topics") if !c.closed() { c.discover() } @@ -361,7 +361,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum for _, t := range topics { go func(topic string) { defer wg.Done() - c, err := internalTopicSubscribe(c, opts, topic, ch, dlq) + c, err := newInternalConsumer(c, opts, topic, ch, dlq) consumerErrorCh <- consumerError{ err: err, topic: topic, diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 05fab875b3..6007c451c7 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -21,6 +21,8 @@ import ( "context" "fmt" "log" + "net/http" + "strconv" "testing" "time" @@ -326,7 +328,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { topic := "persistent://public/default/testGetPartitions" testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions" - makeHTTPCall(t, testURL, "64") + makeHTTPCall(t, http.MethodPut, testURL, "64") // create producer producer, err := client.CreateProducer(ProducerOptions{ @@ -410,7 +412,7 @@ func TestConsumerShared(t *testing.T) { topic := "persistent://public/default/testMultiPartitionConsumerShared" testURL := adminURL + "/" + "admin/v2/persistent/public/default/testMultiPartitionConsumerShared/partitions" - makeHTTPCall(t, testURL, "3") + makeHTTPCall(t, http.MethodPut, testURL, "3") sub := "sub-shared-1" consumer1, err := client.Subscribe(ConsumerOptions{ @@ -1207,3 +1209,68 @@ func TestGetDeliveryCount(t *testing.T) { assert.Nil(t, err) assert.Equal(t, uint32(3), msg.RedeliveryCount()) } + +func TestConsumerAddTopicPartitions(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions" + makeHTTPCall(t, http.MethodPut, testURL, "3") + + // create producer + partitionsAutoDiscoveryInterval = 100 * time.Millisecond + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + MessageRouter: func(msg *ProducerMessage, topicMetadata TopicMetadata) int { + // The message key will contain the partition id where to route + i, err := strconv.Atoi(msg.Key) + assert.NoError(t, err) + return i + }, + }) + assert.Nil(t, err) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + AutoDiscoveryPeriod: 100 * time.Millisecond, + }) + assert.Nil(t, err) + defer consumer.Close() + + // Increase number of partitions to 10 + makeHTTPCall(t, http.MethodPost, testURL, "10") + + // Wait for the producer/consumers to pick up the change + time.Sleep(1 * time.Second) + + // Publish messages ensuring that they will go to all the partitions + ctx := context.Background() + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Key: fmt.Sprintf("%d", i), + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.Nil(t, err) + } + + msgs := make([]string, 0) + + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + msgs = append(msgs, string(msg.Payload())) + + fmt.Printf("Received message msgId: %#v -- content: '%s'\n", + msg.ID(), string(msg.Payload())) + + consumer.Ack(msg) + } + + assert.Equal(t, len(msgs), 10) +} diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 67d0360b56..1404045b58 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -19,20 +19,32 @@ package pulsar import ( "context" + "sync" + "sync/atomic" "time" + log "github.com/sirupsen/logrus" + "github.com/apache/pulsar-client-go/pulsar/internal" ) type producer struct { + sync.Mutex client *client + options *ProducerOptions topic string producers []Producer + numPartitions uint32 messageRouter func(*ProducerMessage, TopicMetadata) int + ticker *time.Ticker + + log *log.Entry } const defaultBatchingMaxPublishDelay = 10 * time.Millisecond +var partitionsAutoDiscoveryInterval = 1 * time.Minute + func getHashingFunction(s HashingScheme) func(string) uint32 { switch s { case JavaStringHash: @@ -50,8 +62,10 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { } p := &producer{ - topic: options.Topic, - client: client, + options: options, + topic: options.Topic, + client: client, + log: log.WithField("topic", options.Topic), } var batchingMaxPublishDelay time.Duration @@ -73,13 +87,54 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { p.messageRouter = options.MessageRouter } - partitions, err := client.TopicPartitions(options.Topic) + err := p.internalCreatePartitionsProducers() if err != nil { return nil, err } - numPartitions := len(partitions) - p.producers = make([]Producer, numPartitions) + p.ticker = time.NewTicker(partitionsAutoDiscoveryInterval) + + go func() { + for range p.ticker.C { + p.log.Debug("Auto discovering new partitions") + p.internalCreatePartitionsProducers() + } + }() + + return p, nil +} + +func (p *producer) internalCreatePartitionsProducers() error { + partitions, err := p.client.TopicPartitions(p.topic) + if err != nil { + return err + } + + oldNumPartitions := 0 + newNumPartitions := len(partitions) + + p.Lock() + defer p.Unlock() + oldProducers := p.producers + + if oldProducers != nil { + oldNumPartitions = len(oldProducers) + if oldNumPartitions == newNumPartitions { + p.log.Debug("Number of partitions in topic has not changed") + return nil + } + + p.log.WithField("old_partitions", oldNumPartitions). + WithField("new_partitions", newNumPartitions). + Info("Changed number of partitions in topic") + } + + p.producers = make([]Producer, newNumPartitions) + + // Copy over the existing consumer instances + for i := 0; i < oldNumPartitions; i++ { + p.producers[i] = oldProducers[i] + } type ProducerError struct { partition int @@ -87,11 +142,14 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { err error } - c := make(chan ProducerError, numPartitions) + partitionsToAdd := newNumPartitions - oldNumPartitions + c := make(chan ProducerError, partitionsToAdd) + + for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ { + partition := partitions[partitionIdx] - for partitionIdx, partition := range partitions { go func(partitionIdx int, partition string) { - prod, e := newPartitionProducer(client, partition, options, partitionIdx) + prod, e := newPartitionProducer(p.client, partition, p.options, partitionIdx) c <- ProducerError{ partition: partitionIdx, prod: prod, @@ -100,7 +158,7 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { }(partitionIdx, partition) } - for i := 0; i < numPartitions; i++ { + for i := 0; i < partitionsToAdd; i++ { pe, ok := <-c if ok { if pe.err != nil { @@ -118,10 +176,11 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { producer.Close() } } - return nil, err + return err } - return p, nil + atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers))) + return nil } func (p *producer) Topic() string { @@ -129,25 +188,39 @@ func (p *producer) Topic() string { } func (p *producer) Name() string { + p.Lock() + defer p.Unlock() + return p.producers[0].Name() } func (p *producer) NumPartitions() uint32 { - return uint32(len(p.producers)) + return atomic.LoadUint32(&p.numPartitions) } func (p *producer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) { + p.Lock() partition := p.messageRouter(msg, p) - return p.producers[partition].Send(ctx, msg) + pp := p.producers[partition] + p.Unlock() + + return pp.Send(ctx, msg) } func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error)) { + p.Lock() partition := p.messageRouter(msg, p) - p.producers[partition].SendAsync(ctx, msg, callback) + pp := p.producers[partition] + p.Unlock() + + pp.SendAsync(ctx, msg, callback) } func (p *producer) LastSequenceID() int64 { + p.Lock() + defer p.Unlock() + var maxSeq int64 = -1 for _, pp := range p.producers { s := pp.LastSequenceID() @@ -159,6 +232,9 @@ func (p *producer) LastSequenceID() int64 { } func (p *producer) Flush() error { + p.Lock() + defer p.Unlock() + for _, pp := range p.producers { if err := pp.Flush(); err != nil { return err @@ -169,6 +245,9 @@ func (p *producer) Flush() error { } func (p *producer) Close() { + p.Lock() + defer p.Unlock() + for _, pp := range p.producers { pp.Close() } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index a45d8cefa4..2910427eec 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -20,6 +20,7 @@ package pulsar import ( "context" "fmt" + "net/http" "strconv" "sync" "testing" @@ -368,7 +369,7 @@ func TestFlushInPartitionedProducer(t *testing.T) { // call admin api to make it partitioned url := adminURL + "/" + "admin/v2/persistent/" + topicName + "/partitions" - makeHTTPCall(t, url, "5") + makeHTTPCall(t, http.MethodPut, url, "5") numberOfPartitions := 5 numOfMessages := 10 @@ -446,7 +447,7 @@ func TestRoundRobinRouterPartitionedProducer(t *testing.T) { // call admin api to make it partitioned url := adminURL + "/" + "admin/v2/persistent/" + topicName + "/partitions" - makeHTTPCall(t, url, strconv.Itoa(numberOfPartitions)) + makeHTTPCall(t, http.MethodPut, url, strconv.Itoa(numberOfPartitions)) numOfMessages := 10 ctx := context.Background() diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go index 9d54f20030..3471bca5e6 100644 --- a/pulsar/test_helper.go +++ b/pulsar/test_helper.go @@ -118,10 +118,10 @@ func httpDo(method string, requestPath string, in interface{}, out interface{}) return nil } -func makeHTTPCall(t *testing.T, url string, body string) { +func makeHTTPCall(t *testing.T, method string, url string, body string) { client := http.Client{} - req, err := http.NewRequest(http.MethodPut, url, strings.NewReader(body)) + req, err := http.NewRequest(method, url, strings.NewReader(body)) if err != nil { t.Fatal(err) }