Skip to content

Commit

Permalink
Support partition consumer receive async and fix batch logic (apache#43)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolong.ran ranxiaolong716@gmail.com

* Support batch logic for project

* add unit test case of event time

* add some unit tests case for producer

* fix error result type

* add unit test case of producer flush

* add receiver queue size test logic

* support partition consumer receive async

* add unit test case of ack timeout

* Fix consumer receiving message out of order
  • Loading branch information
wolfstudy committed Aug 14, 2019
1 parent 670da09 commit 448387d
Show file tree
Hide file tree
Showing 12 changed files with 870 additions and 190 deletions.
3 changes: 3 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ type Consumer interface {
// ReceiveAsync appends the message to the msgs channel asynchronously.
ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error

// ReceiveAsyncWithCallback returns a callback containing the message and error objects
ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error))

// Ack the consumption of a single message
Ack(Message) error

Expand Down
301 changes: 294 additions & 7 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand All @@ -15,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package pulsar

Expand Down Expand Up @@ -124,6 +122,67 @@ func TestConsumerConnectError(t *testing.T) {
assert.Equal(t, err.Error(), "connection error")
}

func TestBatchMessageReceive(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topicName := "persistent://public/default/receive-batch"
subName := "subscription-name"
prefix := "msg-batch-"
ctx := context.Background()

// Enable batching on producer side
batchSize, numOfMessages := 2, 100

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
BatchingMaxMessages: uint(batchSize),
DisableBatching: false,
BlockIfQueueFull: true,
})
assert.Nil(t, err)
assert.Equal(t, topicName, producer.Topic())
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
})
assert.Equal(t, topicName, consumer.Topic())
count := 0

for i := 0; i < numOfMessages; i++ {
messageContent := prefix + fmt.Sprintf("%d", i)
msg := &ProducerMessage{
Payload: []byte(messageContent),
}
err := producer.Send(ctx, msg)
assert.Nil(t, err)
}

for i := 0; i < numOfMessages; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
err = consumer.Ack(msg)
assert.Nil(t, err)
count++
}

// check strategically
for i := 0; i < 3; i++ {
if count == numOfMessages {
break
}
time.Sleep(time.Second)
}
assert.Equal(t, count, numOfMessages)
}

func TestConsumerWithInvalidConf(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
Expand Down Expand Up @@ -263,7 +322,7 @@ func TestConsumerKeyShared(t *testing.T) {
assert.Nil(t, err)
}

time.Sleep(time.Second * 5)
time.Sleep(time.Second * 1)

go func() {
for i := 0; i < 10; i++ {
Expand All @@ -288,6 +347,8 @@ func TestConsumerKeyShared(t *testing.T) {
}
}
}()

time.Sleep(time.Second * 1)
}

func TestPartitionTopicsConsumerPubSub(t *testing.T) {
Expand All @@ -300,7 +361,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
topic := "persistent://public/default/testGetPartitions"
testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions"

makeHTTPCall(t, http.MethodPut, testURL, "3")
makeHTTPCall(t, http.MethodPut, testURL, "5")

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Expand All @@ -316,9 +377,10 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
assert.Equal(t, topic+"-partition-2", topics[2])

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
ReceiverQueueSize: 10,
})
assert.Nil(t, err)
defer consumer.Close()
Expand Down Expand Up @@ -348,3 +410,228 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {

assert.Equal(t, len(msgs), 10)
}

func TestConsumer_ReceiveAsync(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topicName := "persistent://public/default/receive-async"
subName := "subscription-receive-async"
ctx := context.Background()
ch := make(chan ConsumerMessage, 10)

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
})
defer consumer.Close()

//send 10 messages
for i := 0; i < 10; i++ {
err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}

//receive async 10 messages
err = consumer.ReceiveAsync(ctx, ch)
assert.Nil(t, err)

payloadList := make([]string, 0, 10)

RECEIVE:
for {
select {
case cMsg, ok := <-ch:
if ok {
fmt.Printf("receive message payload is:%s\n", string(cMsg.Payload()))
assert.Equal(t, topicName, cMsg.Message.Topic())
assert.Equal(t, topicName, cMsg.Consumer.Topic())
payloadList = append(payloadList, string(cMsg.Message.Payload()))
if len(payloadList) == 10 {
break RECEIVE
}
}
continue RECEIVE
case <-ctx.Done():
t.Error("context error.")
return
}
}
}

func TestConsumerAckTimeout(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()

topic := "test-ack-timeout-topic-1"
ctx := context.Background()

// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub1",
Type: Shared,
AckTimeout: 5 * 1000,
})
assert.Nil(t, err)
defer consumer.Close()

// create consumer1
consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub2",
Type: Shared,
AckTimeout: 5 * 1000,
})
assert.Nil(t, err)
defer consumer1.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
for i := 0; i < 10; i++ {
if err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
}

// consumer receive 10 messages
payloadList := make([]string, 0, 10)
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
payloadList = append(payloadList, string(msg.Payload()))

// not ack message
}
assert.Equal(t, 10, len(payloadList))

// consumer1 receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer1.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

payloadList = append(payloadList, string(msg.Payload()))

// ack half of the messages
if i%2 == 0 {
err = consumer1.Ack(msg)
assert.Nil(t, err)
}
}

// wait ack timeout
time.Sleep(6 * time.Second)

fmt.Println("start redeliver messages...")

payloadList = make([]string, 0, 10)
// consumer receive messages again
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
payloadList = append(payloadList, string(msg.Payload()))

// ack message
if err := consumer.Ack(msg); err != nil {
log.Fatal(err)
}
}
assert.Equal(t, 10, len(payloadList))

payloadList = make([]string, 0, 5)
// consumer1 receive messages again
go func() {
for i := 0; i < 10; i++ {
msg, err := consumer1.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

expectMsg := fmt.Sprintf("hello-%d", i)
fmt.Printf("redeliver messages, payload is:%s\n", expectMsg)
payloadList = append(payloadList, string(msg.Payload()))

// ack message
if err := consumer1.Ack(msg); err != nil {
log.Fatal(err)
}
}
assert.Equal(t, 5, len(payloadList))
}()

// sleep 2 seconds, wait gorutine receive messages.
time.Sleep(time.Second * 2)
}

func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topicName := "persistent://public/default/receive-async-with-callback"
subName := "subscription-receive-async"
ctx := context.Background()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
})
defer consumer.Close()

//send 10 messages
for i := 0; i < 10; i++ {
err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.Nil(t, err)
}

for i := 0; i < 10; i++ {
consumer.ReceiveAsyncWithCallback(ctx, func(msg Message, err error) {
if err != nil {
log.Fatal(err)
}
fmt.Printf("receive message payload is:%s\n", string(msg.Payload()))
assert.Equal(t, fmt.Sprintf("hello-%d", i), string(msg.Payload()))
})
}
}
2 changes: 1 addition & 1 deletion pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Result int

const (
// ResultOk means no errors
ResultOk = iota
ResultOk Result = iota
// ResultUnknownError means unknown error happened on broker
ResultUnknownError
// ResultInvalidConfiguration means invalid configuration
Expand Down
Loading

0 comments on commit 448387d

Please sign in to comment.