Skip to content

Commit

Permalink
fix: fix 879 (#902)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gleiphir2769 committed Dec 9, 2022
1 parent 48c39ee commit 055b00b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 5 deletions.
35 changes: 30 additions & 5 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,12 +804,22 @@ func TestConsumerCompression(t *testing.T) {
topicName := newTopicName()
ctx := context.Background()

producer, err := client.CreateProducer(ProducerOptions{
// enable batching
p1, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
CompressionType: LZ4,
})
assert.Nil(t, err)
defer producer.Close()
defer p1.Close()

// disable batching
p2, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
CompressionType: LZ4,
DisableBatching: true,
})
assert.Nil(t, err)
defer p2.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
Expand All @@ -821,8 +831,16 @@ func TestConsumerCompression(t *testing.T) {
const N = 100

for i := 0; i < N; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
if _, err := p1.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d-batching-enabled", i)),
}); err != nil {
t.Fatal(err)
}
}

for i := 0; i < N; i++ {
if _, err := p2.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d-batching-disabled", i)),
}); err != nil {
t.Fatal(err)
}
Expand All @@ -831,7 +849,14 @@ func TestConsumerCompression(t *testing.T) {
for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-enabled", i), string(msg.Payload()))
consumer.Ack(msg)
}

for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-disabled", i), string(msg.Payload()))
consumer.Ack(msg)
}
}
Expand Down
6 changes: 6 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,12 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
compressedSize = len(compressedPayload)
checkSize = compressedSize

// set the compress type in msgMetaData
compressionType := pb.CompressionType(p.options.CompressionType)
if compressionType != pb.CompressionType_NONE {
mm.Compression = &compressionType
}
} else {
// final check for batching message is in serializeMessage
// this is a double check
Expand Down

0 comments on commit 055b00b

Please sign in to comment.