Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 304][Reader] fixed panic in CreateReader API using custom MessageID for ReaderOptions #305

Merged
merged 2 commits into from Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 15 additions & 2 deletions pulsar/reader_impl.go
Expand Up @@ -44,6 +44,19 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
return nil, newError(ResultInvalidConfiguration, "StartMessageID is required")
}

var startMessageID *messageID
var ok bool
if startMessageID, ok = options.StartMessageID.(*messageID); !ok {
// a custom type satisfying MessageID may not be a *messageID
// so re-create *messageID using its data
deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize())
if err != nil {
return nil, err
}
// de-serialized MessageID is a *messageID
startMessageID = deserMsgID.(*messageID)
}

subscriptionName := options.SubscriptionRolePrefix
if subscriptionName == "" {
subscriptionName = "reader"
Expand All @@ -61,7 +74,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
subscription: subscriptionName,
subscriptionType: Exclusive,
receiverQueueSize: receiverQueueSize,
startMessageID: options.StartMessageID.(*messageID),
startMessageID: startMessageID,
startMessageIDInclusive: options.StartMessageIDInclusive,
subscriptionMode: nonDurable,
readCompacted: options.ReadCompacted,
Expand All @@ -80,8 +93,8 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
if err != nil {
return nil, err
}
pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq)

pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq)
if err != nil {
close(reader.messageCh)
return nil, err
Expand Down
84 changes: 84 additions & 0 deletions pulsar/reader_test.go
Expand Up @@ -362,3 +362,87 @@ func TestReaderHasNext(t *testing.T) {

assert.Equal(t, 10, i)
}

type myMessageID struct {
data []byte
}

func (id *myMessageID) Serialize() []byte {
return id.data
}

func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
ctx := context.Background()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
msgIDs := [10]MessageID{}
for i := 0; i < 10; i++ {
msgID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.NoError(t, err)
assert.NotNil(t, msgID)
msgIDs[i] = msgID
}

// custom start message ID
myStartMsgID := &myMessageID{
data: msgIDs[4].Serialize(),
}

// attempt to create reader on 5th message (not included)
var reader Reader
assert.NotPanics(t, func() {
reader, err = client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: myStartMsgID,
})
})

assert.Nil(t, err)
defer reader.Close()

// receive the remaining 5 messages
for i := 5; i < 10; i++ {
msg, err := reader.Next(context.Background())
assert.NoError(t, err)

expectMsg := fmt.Sprintf("hello-%d", i)
assert.Equal(t, []byte(expectMsg), msg.Payload())
}

// create reader on 5th message (included)
readerInclusive, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: myStartMsgID,
StartMessageIDInclusive: true,
})

assert.Nil(t, err)
defer readerInclusive.Close()

// receive the remaining 6 messages
for i := 4; i < 10; i++ {
msg, err := readerInclusive.Next(context.Background())
assert.NoError(t, err)

expectMsg := fmt.Sprintf("hello-%d", i)
assert.Equal(t, []byte(expectMsg), msg.Payload())
}
}