diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 20dc1afe8f..157353cfa1 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -36,8 +36,8 @@ var ErrConsumerClosed = errors.New("consumer closed") const defaultNackRedeliveryDelay = 1 * time.Minute type acker interface { - AckID(id *messageID) - NackID(id *messageID) + AckID(id messageID) + NackID(id messageID) } type consumer struct { @@ -235,7 +235,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { nackRedeliveryDelay: nackRedeliveryDelay, metadata: metadata, replicateSubscriptionState: c.options.ReplicateSubscriptionState, - startMessageID: nil, + startMessageID: messageID{}, subscriptionMode: durable, readCompacted: c.options.ReadCompacted, } @@ -452,11 +452,11 @@ func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_I return pb.CommandSubscribe_Latest } -func (c *consumer) messageID(msgID MessageID) (*messageID, bool) { - mid, ok := msgID.(*messageID) +func (c *consumer) messageID(msgID MessageID) (messageID, bool) { + mid, ok := msgID.(messageID) if !ok { - c.log.Warnf("invalid message id type") - return nil, false + c.log.Warnf("invalid message id type %T", msgID) + return messageID{}, false } partition := int(mid.partitionIdx) @@ -464,7 +464,7 @@ func (c *consumer) messageID(msgID MessageID) (*messageID, bool) { if partition < 0 || partition >= len(c.consumers) { c.log.Warnf("invalid partition index %d expected a partition between [0-%d]", partition, len(c.consumers)) - return nil, false + return messageID{}, false } return mid, true diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 94e7b9de34..a3d1a8fc72 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -117,9 +117,9 @@ func (c *multiTopicConsumer) Ack(msg Message) { // Ack the consumption of a single message, identified by its MessageID func (c *multiTopicConsumer) AckID(msgID MessageID) { - mid, ok := msgID.(*messageID) + mid, ok := msgID.(messageID) if !ok { - c.log.Warnf("invalid message id type") + c.log.Warnf("invalid message id type %T", msgID) return } @@ -136,9 +136,9 @@ func (c *multiTopicConsumer) Nack(msg Message) { } func (c *multiTopicConsumer) NackID(msgID MessageID) { - mid, ok := msgID.(*messageID) + mid, ok := msgID.(messageID) if !ok { - c.log.Warnf("invalid message id type") + c.log.Warnf("invalid message id type %T", msgID) return } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index cffc2b43e2..e539e51755 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -63,7 +63,7 @@ type partitionConsumerOpts struct { nackRedeliveryDelay time.Duration metadata map[string]string replicateSubscriptionState bool - startMessageID *messageID + startMessageID messageID startMessageIDInclusive bool subscriptionMode subscriptionMode readCompacted bool @@ -94,13 +94,13 @@ type partitionConsumer struct { // the size of the queue channel for buffering messages queueSize int32 queueCh chan []*message - startMessageID *messageID - lastDequeuedMsg *messageID + startMessageID messageID + lastDequeuedMsg messageID eventsCh chan interface{} connectedCh chan struct{} closeCh chan struct{} - clearQueueCh chan func(id *messageID) + clearQueueCh chan func(id messageID) nackTracker *negativeAcksTracker dlq *dlqRouter @@ -128,7 +128,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon connectedCh: make(chan struct{}), messageCh: messageCh, closeCh: make(chan struct{}), - clearQueueCh: make(chan func(id *messageID)), + clearQueueCh: make(chan func(id messageID)), compressionProviders: make(map[pb.CompressionType]compression.Provider), dlq: dlq, log: log.WithField("topic", options.topic), @@ -192,7 +192,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) { pc.state = consumerClosed } -func (pc *partitionConsumer) getLastMessageID() (*messageID, error) { +func (pc *partitionConsumer) getLastMessageID() (messageID, error) { req := &getLastMsgIDRequest{doneCh: make(chan struct{})} pc.eventsCh <- req @@ -220,8 +220,8 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) } } -func (pc *partitionConsumer) AckID(msgID *messageID) { - if msgID != nil && msgID.ack() { +func (pc *partitionConsumer) AckID(msgID messageID) { + if !msgID.IsZero() && msgID.ack() { req := &ackRequest{ msgID: msgID, } @@ -229,7 +229,7 @@ func (pc *partitionConsumer) AckID(msgID *messageID) { } } -func (pc *partitionConsumer) NackID(msgID *messageID) { +func (pc *partitionConsumer) NackID(msgID messageID) { pc.nackTracker.Add(msgID) } @@ -268,7 +268,7 @@ func (pc *partitionConsumer) Close() { <-req.doneCh } -func (pc *partitionConsumer) Seek(msgID *messageID) error { +func (pc *partitionConsumer) Seek(msgID messageID) error { req := &seekRequest{ doneCh: make(chan struct{}), msgID: msgID, @@ -450,8 +450,8 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header return nil } -func (pc *partitionConsumer) messageShouldBeDiscarded(msgID *messageID) bool { - if pc.startMessageID == nil { +func (pc *partitionConsumer) messageShouldBeDiscarded(msgID messageID) bool { + if pc.startMessageID.IsZero() { return false } @@ -571,7 +571,7 @@ func (pc *partitionConsumer) dispatcher() { case clearQueueCb := <-pc.clearQueueCh: // drain the message queue on any new connection by sending a // special nil message to the channel so we know when to stop dropping messages - var nextMessageInQueue *messageID + var nextMessageInQueue messageID go func() { pc.queueCh <- nil }() @@ -579,8 +579,8 @@ func (pc *partitionConsumer) dispatcher() { // the queue has been drained if m == nil { break - } else if nextMessageInQueue == nil { - nextMessageInQueue = m[0].msgID.(*messageID) + } else if nextMessageInQueue.IsZero() { + nextMessageInQueue = m[0].msgID.(messageID) } } @@ -590,7 +590,7 @@ func (pc *partitionConsumer) dispatcher() { } type ackRequest struct { - msgID *messageID + msgID messageID } type unsubscribeRequest struct { @@ -608,13 +608,13 @@ type redeliveryRequest struct { type getLastMsgIDRequest struct { doneCh chan struct{} - msgID *messageID + msgID messageID err error } type seekRequest struct { doneCh chan struct{} - msgID *messageID + msgID messageID err error } @@ -794,15 +794,15 @@ func (pc *partitionConsumer) grabConn() error { } } -func (pc *partitionConsumer) clearQueueAndGetNextMessage() *messageID { +func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID { if pc.state != consumerReady { - return nil + return messageID{} } wg := &sync.WaitGroup{} wg.Add(1) - var msgID *messageID + var msgID messageID - pc.clearQueueCh <- func(id *messageID) { + pc.clearQueueCh <- func(id messageID) { msgID = id wg.Done() } @@ -815,12 +815,12 @@ func (pc *partitionConsumer) clearQueueAndGetNextMessage() *messageID { * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was * not seen by the application */ -func (pc *partitionConsumer) clearReceiverQueue() *messageID { +func (pc *partitionConsumer) clearReceiverQueue() messageID { nextMessageInQueue := pc.clearQueueAndGetNextMessage() - if nextMessageInQueue != nil { + if !nextMessageInQueue.IsZero() { return getPreviousMessage(nextMessageInQueue) - } else if pc.lastDequeuedMsg != nil { + } else if !pc.lastDequeuedMsg.IsZero() { // If the queue was empty we need to restart from the message just after the last one that has been dequeued // in the past return pc.lastDequeuedMsg @@ -830,9 +830,9 @@ func (pc *partitionConsumer) clearReceiverQueue() *messageID { } } -func getPreviousMessage(mid *messageID) *messageID { +func getPreviousMessage(mid messageID) messageID { if mid.batchIdx >= 0 { - return &messageID{ + return messageID{ ledgerID: mid.ledgerID, entryID: mid.entryID, batchIdx: mid.batchIdx - 1, @@ -841,7 +841,7 @@ func getPreviousMessage(mid *messageID) *messageID { } // Get on previous message in previous entry - return &messageID{ + return messageID{ ledgerID: mid.ledgerID, entryID: mid.entryID - 1, batchIdx: mid.batchIdx, @@ -901,8 +901,8 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData, }) } -func convertToMessageIDData(msgID *messageID) *pb.MessageIdData { - if msgID == nil { +func convertToMessageIDData(msgID messageID) *pb.MessageIdData { + if msgID.IsZero() { return nil } @@ -912,16 +912,15 @@ func convertToMessageIDData(msgID *messageID) *pb.MessageIdData { } } -func convertToMessageID(id *pb.MessageIdData) *messageID { +func convertToMessageID(id *pb.MessageIdData) messageID { if id == nil { - return nil + return messageID{} } - msgID := &messageID{ + msgID := messageID{ ledgerID: int64(*id.LedgerId), entryID: int64(*id.EntryId), } - if id.BatchIndex != nil { msgID.batchIdx = *id.BatchIndex } diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index 5a5a94e1ab..0fcbdc5d0c 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -44,11 +44,11 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) { // ensure the tracker was set on the message id messages := <-pc.queueCh for _, m := range messages { - assert.Nil(t, m.ID().(*messageID).tracker) + assert.Nil(t, m.ID().(messageID).tracker) } // ack the message id - pc.AckID(messages[0].msgID.(*messageID)) + pc.AckID(messages[0].msgID.(messageID)) select { case <-eventsCh: @@ -73,11 +73,11 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { // ensure the tracker was set on the message id messages := <-pc.queueCh for _, m := range messages { - assert.Nil(t, m.ID().(*messageID).tracker) + assert.Nil(t, m.ID().(messageID).tracker) } // ack the message id - pc.AckID(messages[0].msgID.(*messageID)) + pc.AckID(messages[0].msgID.(messageID)) select { case <-eventsCh: @@ -102,12 +102,12 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { // ensure the tracker was set on the message id messages := <-pc.queueCh for _, m := range messages { - assert.NotNil(t, m.ID().(*messageID).tracker) + assert.NotNil(t, m.ID().(messageID).tracker) } // ack all message ids except the last one for i := 0; i < 9; i++ { - pc.AckID(messages[i].msgID.(*messageID)) + pc.AckID(messages[i].msgID.(messageID)) } select { @@ -117,7 +117,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { } // ack last message - pc.AckID(messages[9].msgID.(*messageID)) + pc.AckID(messages[9].msgID.(messageID)) select { case <-eventsCh: diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 3d0aebe434..e83db07279 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -161,9 +161,9 @@ func (c *regexConsumer) Ack(msg Message) { // Ack the consumption of a single message, identified by its MessageID func (c *regexConsumer) AckID(msgID MessageID) { - mid, ok := msgID.(*messageID) + mid, ok := msgID.(messageID) if !ok { - c.log.Warnf("invalid message id type") + c.log.Warnf("invalid message id type %T", msgID) return } @@ -180,9 +180,9 @@ func (c *regexConsumer) Nack(msg Message) { } func (c *regexConsumer) NackID(msgID MessageID) { - mid, ok := msgID.(*messageID) + mid, ok := msgID.(messageID) if !ok { - c.log.Warnf("invalid message id type") + c.log.Warnf("invalid message id type %T", msgID) return } diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index d9574cde1e..f1a9a7c4b0 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -38,7 +38,11 @@ type messageID struct { consumer acker } -func (id *messageID) Ack() { +func (id messageID) IsZero() bool { + return id == messageID{} +} + +func (id messageID) Ack() { if id.consumer == nil { return } @@ -47,21 +51,21 @@ func (id *messageID) Ack() { } } -func (id *messageID) Nack() { +func (id messageID) Nack() { if id.consumer == nil { return } id.consumer.NackID(id) } -func (id *messageID) ack() bool { +func (id messageID) ack() bool { if id.tracker != nil && id.batchIdx > -1 { return id.tracker.ack(int(id.batchIdx)) } return true } -func (id *messageID) greater(other *messageID) bool { +func (id messageID) greater(other messageID) bool { if id.ledgerID != other.ledgerID { return id.ledgerID > other.ledgerID } @@ -73,22 +77,22 @@ func (id *messageID) greater(other *messageID) bool { return id.batchIdx > other.batchIdx } -func (id *messageID) equal(other *messageID) bool { +func (id messageID) equal(other messageID) bool { return id.ledgerID == other.ledgerID && id.entryID == other.entryID && id.batchIdx == other.batchIdx } -func (id *messageID) greaterEqual(other *messageID) bool { +func (id messageID) greaterEqual(other messageID) bool { return id.equal(other) || id.greater(other) } -func (id *messageID) Serialize() []byte { +func (id messageID) Serialize() []byte { msgID := &pb.MessageIdData{ LedgerId: proto.Uint64(uint64(id.ledgerID)), EntryId: proto.Uint64(uint64(id.entryID)), - BatchIndex: proto.Int(int(id.batchIdx)), - Partition: proto.Int(int(id.partitionIdx)), + BatchIndex: proto.Int32(id.batchIdx), + Partition: proto.Int32(id.partitionIdx), } data, _ := proto.Marshal(msgID) return data @@ -110,7 +114,7 @@ func deserializeMessageID(data []byte) (MessageID, error) { } func newMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32) MessageID { - return &messageID{ + return messageID{ ledgerID: ledgerID, entryID: entryID, batchIdx: batchIdx, @@ -119,8 +123,8 @@ func newMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx in } func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, partitionIdx int32, - tracker *ackTracker) *messageID { - return &messageID{ + tracker *ackTracker) messageID { + return messageID{ ledgerID: ledgerID, entryID: entryID, batchIdx: batchIdx, diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go index 164cff6fc9..8c6641138d 100644 --- a/pulsar/impl_message_test.go +++ b/pulsar/impl_message_test.go @@ -31,10 +31,10 @@ func TestMessageId(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, id2) - assert.Equal(t, int64(1), id2.(*messageID).ledgerID) - assert.Equal(t, int64(2), id2.(*messageID).entryID) - assert.Equal(t, int32(3), id2.(*messageID).batchIdx) - assert.Equal(t, int32(4), id2.(*messageID).partitionIdx) + assert.Equal(t, int64(1), id2.(messageID).ledgerID) + assert.Equal(t, int64(2), id2.(messageID).entryID) + assert.Equal(t, int32(3), id2.(messageID).batchIdx) + assert.Equal(t, int32(4), id2.(messageID).partitionIdx) id, err = DeserializeMessageID(nil) assert.Error(t, err) @@ -82,7 +82,7 @@ func TestAckingMessageIDBatchOne(t *testing.T) { func TestAckingMessageIDBatchTwo(t *testing.T) { tracker := newAckTracker(2) - ids := []*messageID{ + ids := []messageID{ newTrackingMessageID(1, 1, 0, 0, tracker), newTrackingMessageID(1, 1, 1, 0, tracker), } @@ -93,7 +93,7 @@ func TestAckingMessageIDBatchTwo(t *testing.T) { // try reverse order tracker = newAckTracker(2) - ids = []*messageID{ + ids = []messageID{ newTrackingMessageID(1, 1, 0, 0, tracker), newTrackingMessageID(1, 1, 1, 0, tracker), } diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index f8878448ed..a7dc88e3f4 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -51,7 +51,7 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration) *negativ return t } -func (t *negativeAcksTracker) Add(msgID *messageID) { +func (t *negativeAcksTracker) Add(msgID messageID) { // Always clear up the batch index since we want to track the nack // for the entire batch batchMsgID := messageID{ diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index f7a1c50450..3930b7fbe4 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -76,13 +76,13 @@ func TestNacksTracker(t *testing.T) { nmc := newNackMockedConsumer() nacks := newNegativeAcksTracker(nmc, testNackDelay) - nacks.Add(&messageID{ + nacks.Add(messageID{ ledgerID: 1, entryID: 1, batchIdx: 1, }) - nacks.Add(&messageID{ + nacks.Add(messageID{ ledgerID: 2, entryID: 2, batchIdx: 1, @@ -107,25 +107,25 @@ func TestNacksWithBatchesTracker(t *testing.T) { nmc := newNackMockedConsumer() nacks := newNegativeAcksTracker(nmc, testNackDelay) - nacks.Add(&messageID{ + nacks.Add(messageID{ ledgerID: 1, entryID: 1, batchIdx: 1, }) - nacks.Add(&messageID{ + nacks.Add(messageID{ ledgerID: 1, entryID: 1, batchIdx: 2, }) - nacks.Add(&messageID{ + nacks.Add(messageID{ ledgerID: 1, entryID: 1, batchIdx: 3, }) - nacks.Add(&messageID{ + nacks.Add(messageID{ ledgerID: 2, entryID: 2, batchIdx: 1, diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 95e4ed1c29..8d389cbf4c 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -322,7 +322,7 @@ func TestFlushInProducer(t *testing.T) { assert.Nil(t, err) msgCount++ - msgID := msg.ID().(*messageID) + msgID := msg.ID().(messageID) // Since messages are batched, they will be sharing the same ledgerId/entryId if ledgerID == -1 { ledgerID = msgID.ledgerID diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index b3399dc669..b74b35b706 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -30,7 +30,7 @@ const ( type reader struct { pc *partitionConsumer messageCh chan ConsumerMessage - lastMessageInBroker *messageID + lastMessageInBroker messageID log *log.Entry } @@ -44,17 +44,17 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, newError(ResultInvalidConfiguration, "StartMessageID is required") } - var startMessageID *messageID + var startMessageID messageID var ok bool - if startMessageID, ok = options.StartMessageID.(*messageID); !ok { - // a custom type satisfying MessageID may not be a *messageID - // so re-create *messageID using its data + if startMessageID, ok = options.StartMessageID.(messageID); !ok { + // a custom type satisfying MessageID may not be a messageID + // so re-create messageID using its data deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize()) if err != nil { return nil, err } - // de-serialized MessageID is a *messageID - startMessageID = deserMsgID.(*messageID) + // de-serialized MessageID is a messageID + startMessageID = deserMsgID.(messageID) } subscriptionName := options.SubscriptionRolePrefix @@ -118,7 +118,7 @@ func (r *reader) Next(ctx context.Context) (Message, error) { // Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects, // it will specify the subscription position anyway - msgID := cm.Message.ID().(*messageID) + msgID := cm.Message.ID().(messageID) r.pc.lastDequeuedMsg = msgID r.pc.AckID(msgID) return cm.Message, nil @@ -129,7 +129,7 @@ func (r *reader) Next(ctx context.Context) (Message, error) { } func (r *reader) HasNext() bool { - if r.lastMessageInBroker != nil && r.hasMoreMessages() { + if !r.lastMessageInBroker.IsZero() && r.hasMoreMessages() { return true } @@ -148,7 +148,7 @@ func (r *reader) HasNext() bool { } func (r *reader) hasMoreMessages() bool { - if r.pc.lastDequeuedMsg != nil { + if !r.pc.lastDequeuedMsg.IsZero() { return r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg) }