Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not allocate MessageIDs on the heap #319

Merged
merged 1 commit into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ var ErrConsumerClosed = errors.New("consumer closed")
const defaultNackRedeliveryDelay = 1 * time.Minute

type acker interface {
AckID(id *messageID)
NackID(id *messageID)
AckID(id messageID)
NackID(id messageID)
}

type consumer struct {
Expand Down Expand Up @@ -235,7 +235,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
nackRedeliveryDelay: nackRedeliveryDelay,
metadata: metadata,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: nil,
startMessageID: messageID{},
subscriptionMode: durable,
readCompacted: c.options.ReadCompacted,
}
Expand Down Expand Up @@ -452,19 +452,19 @@ func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_I
return pb.CommandSubscribe_Latest
}

func (c *consumer) messageID(msgID MessageID) (*messageID, bool) {
mid, ok := msgID.(*messageID)
func (c *consumer) messageID(msgID MessageID) (messageID, bool) {
mid, ok := msgID.(messageID)
if !ok {
c.log.Warnf("invalid message id type")
return nil, false
c.log.Warnf("invalid message id type %T", msgID)
return messageID{}, false
}

partition := int(mid.partitionIdx)
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
return nil, false
return messageID{}, false
}

return mid, true
Expand Down
8 changes: 4 additions & 4 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ func (c *multiTopicConsumer) Ack(msg Message) {

// Ack the consumption of a single message, identified by its MessageID
func (c *multiTopicConsumer) AckID(msgID MessageID) {
mid, ok := msgID.(*messageID)
mid, ok := msgID.(messageID)
if !ok {
c.log.Warnf("invalid message id type")
c.log.Warnf("invalid message id type %T", msgID)
return
}

Expand All @@ -136,9 +136,9 @@ func (c *multiTopicConsumer) Nack(msg Message) {
}

func (c *multiTopicConsumer) NackID(msgID MessageID) {
mid, ok := msgID.(*messageID)
mid, ok := msgID.(messageID)
if !ok {
c.log.Warnf("invalid message id type")
c.log.Warnf("invalid message id type %T", msgID)
return
}

Expand Down
67 changes: 33 additions & 34 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type partitionConsumerOpts struct {
nackRedeliveryDelay time.Duration
metadata map[string]string
replicateSubscriptionState bool
startMessageID *messageID
startMessageID messageID
startMessageIDInclusive bool
subscriptionMode subscriptionMode
readCompacted bool
Expand Down Expand Up @@ -94,13 +94,13 @@ type partitionConsumer struct {
// the size of the queue channel for buffering messages
queueSize int32
queueCh chan []*message
startMessageID *messageID
lastDequeuedMsg *messageID
startMessageID messageID
lastDequeuedMsg messageID

eventsCh chan interface{}
connectedCh chan struct{}
closeCh chan struct{}
clearQueueCh chan func(id *messageID)
clearQueueCh chan func(id messageID)

nackTracker *negativeAcksTracker
dlq *dlqRouter
Expand Down Expand Up @@ -128,7 +128,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
connectedCh: make(chan struct{}),
messageCh: messageCh,
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id *messageID)),
clearQueueCh: make(chan func(id messageID)),
compressionProviders: make(map[pb.CompressionType]compression.Provider),
dlq: dlq,
log: log.WithField("topic", options.topic),
Expand Down Expand Up @@ -192,7 +192,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
pc.state = consumerClosed
}

func (pc *partitionConsumer) getLastMessageID() (*messageID, error) {
func (pc *partitionConsumer) getLastMessageID() (messageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req

Expand Down Expand Up @@ -220,16 +220,16 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
}
}

func (pc *partitionConsumer) AckID(msgID *messageID) {
if msgID != nil && msgID.ack() {
func (pc *partitionConsumer) AckID(msgID messageID) {
if !msgID.IsZero() && msgID.ack() {
req := &ackRequest{
msgID: msgID,
}
pc.eventsCh <- req
}
}

func (pc *partitionConsumer) NackID(msgID *messageID) {
func (pc *partitionConsumer) NackID(msgID messageID) {
pc.nackTracker.Add(msgID)
}

Expand Down Expand Up @@ -268,7 +268,7 @@ func (pc *partitionConsumer) Close() {
<-req.doneCh
}

func (pc *partitionConsumer) Seek(msgID *messageID) error {
func (pc *partitionConsumer) Seek(msgID messageID) error {
req := &seekRequest{
doneCh: make(chan struct{}),
msgID: msgID,
Expand Down Expand Up @@ -450,8 +450,8 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
return nil
}

func (pc *partitionConsumer) messageShouldBeDiscarded(msgID *messageID) bool {
if pc.startMessageID == nil {
func (pc *partitionConsumer) messageShouldBeDiscarded(msgID messageID) bool {
if pc.startMessageID.IsZero() {
return false
}

Expand Down Expand Up @@ -571,16 +571,16 @@ func (pc *partitionConsumer) dispatcher() {
case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by sending a
// special nil message to the channel so we know when to stop dropping messages
var nextMessageInQueue *messageID
var nextMessageInQueue messageID
go func() {
pc.queueCh <- nil
}()
for m := range pc.queueCh {
// the queue has been drained
if m == nil {
break
} else if nextMessageInQueue == nil {
nextMessageInQueue = m[0].msgID.(*messageID)
} else if nextMessageInQueue.IsZero() {
nextMessageInQueue = m[0].msgID.(messageID)
}
}

Expand All @@ -590,7 +590,7 @@ func (pc *partitionConsumer) dispatcher() {
}

type ackRequest struct {
msgID *messageID
msgID messageID
}

type unsubscribeRequest struct {
Expand All @@ -608,13 +608,13 @@ type redeliveryRequest struct {

type getLastMsgIDRequest struct {
doneCh chan struct{}
msgID *messageID
msgID messageID
err error
}

type seekRequest struct {
doneCh chan struct{}
msgID *messageID
msgID messageID
err error
}

Expand Down Expand Up @@ -794,15 +794,15 @@ func (pc *partitionConsumer) grabConn() error {
}
}

func (pc *partitionConsumer) clearQueueAndGetNextMessage() *messageID {
func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID {
if pc.state != consumerReady {
return nil
return messageID{}
}
wg := &sync.WaitGroup{}
wg.Add(1)
var msgID *messageID
var msgID messageID

pc.clearQueueCh <- func(id *messageID) {
pc.clearQueueCh <- func(id messageID) {
msgID = id
wg.Done()
}
Expand All @@ -815,12 +815,12 @@ func (pc *partitionConsumer) clearQueueAndGetNextMessage() *messageID {
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was
* not seen by the application
*/
func (pc *partitionConsumer) clearReceiverQueue() *messageID {
func (pc *partitionConsumer) clearReceiverQueue() messageID {
nextMessageInQueue := pc.clearQueueAndGetNextMessage()

if nextMessageInQueue != nil {
if !nextMessageInQueue.IsZero() {
return getPreviousMessage(nextMessageInQueue)
} else if pc.lastDequeuedMsg != nil {
} else if !pc.lastDequeuedMsg.IsZero() {
// If the queue was empty we need to restart from the message just after the last one that has been dequeued
// in the past
return pc.lastDequeuedMsg
Expand All @@ -830,9 +830,9 @@ func (pc *partitionConsumer) clearReceiverQueue() *messageID {
}
}

func getPreviousMessage(mid *messageID) *messageID {
func getPreviousMessage(mid messageID) messageID {
if mid.batchIdx >= 0 {
return &messageID{
return messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID,
batchIdx: mid.batchIdx - 1,
Expand All @@ -841,7 +841,7 @@ func getPreviousMessage(mid *messageID) *messageID {
}

// Get on previous message in previous entry
return &messageID{
return messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID - 1,
batchIdx: mid.batchIdx,
Expand Down Expand Up @@ -901,8 +901,8 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}

func convertToMessageIDData(msgID *messageID) *pb.MessageIdData {
if msgID == nil {
func convertToMessageIDData(msgID messageID) *pb.MessageIdData {
if msgID.IsZero() {
return nil
}

Expand All @@ -912,16 +912,15 @@ func convertToMessageIDData(msgID *messageID) *pb.MessageIdData {
}
}

func convertToMessageID(id *pb.MessageIdData) *messageID {
func convertToMessageID(id *pb.MessageIdData) messageID {
if id == nil {
return nil
return messageID{}
}

msgID := &messageID{
msgID := messageID{
ledgerID: int64(*id.LedgerId),
entryID: int64(*id.EntryId),
}

if id.BatchIndex != nil {
msgID.batchIdx = *id.BatchIndex
}
Expand Down
14 changes: 7 additions & 7 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.Nil(t, m.ID().(*messageID).tracker)
assert.Nil(t, m.ID().(messageID).tracker)
}

// ack the message id
pc.AckID(messages[0].msgID.(*messageID))
pc.AckID(messages[0].msgID.(messageID))

select {
case <-eventsCh:
Expand All @@ -73,11 +73,11 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.Nil(t, m.ID().(*messageID).tracker)
assert.Nil(t, m.ID().(messageID).tracker)
}

// ack the message id
pc.AckID(messages[0].msgID.(*messageID))
pc.AckID(messages[0].msgID.(messageID))

select {
case <-eventsCh:
Expand All @@ -102,12 +102,12 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.NotNil(t, m.ID().(*messageID).tracker)
assert.NotNil(t, m.ID().(messageID).tracker)
}

// ack all message ids except the last one
for i := 0; i < 9; i++ {
pc.AckID(messages[i].msgID.(*messageID))
pc.AckID(messages[i].msgID.(messageID))
}

select {
Expand All @@ -117,7 +117,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
}

// ack last message
pc.AckID(messages[9].msgID.(*messageID))
pc.AckID(messages[9].msgID.(messageID))

select {
case <-eventsCh:
Expand Down
8 changes: 4 additions & 4 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func (c *regexConsumer) Ack(msg Message) {

// Ack the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) {
mid, ok := msgID.(*messageID)
mid, ok := msgID.(messageID)
if !ok {
c.log.Warnf("invalid message id type")
c.log.Warnf("invalid message id type %T", msgID)
return
}

Expand All @@ -180,9 +180,9 @@ func (c *regexConsumer) Nack(msg Message) {
}

func (c *regexConsumer) NackID(msgID MessageID) {
mid, ok := msgID.(*messageID)
mid, ok := msgID.(messageID)
if !ok {
c.log.Warnf("invalid message id type")
c.log.Warnf("invalid message id type %T", msgID)
return
}

Expand Down
Loading