From 055b00b83ccfef4ea12a593e3678cb4bdd18311c Mon Sep 17 00:00:00 2001 From: Jiaqi Shen <18863662628@163.com> Date: Fri, 9 Dec 2022 15:14:25 +0800 Subject: [PATCH] fix: fix 879 (#902) --- pulsar/consumer_test.go | 35 ++++++++++++++++++++++++++++++----- pulsar/producer_partition.go | 6 ++++++ 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 989b906c4d..95462ba238 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -804,12 +804,22 @@ func TestConsumerCompression(t *testing.T) { topicName := newTopicName() ctx := context.Background() - producer, err := client.CreateProducer(ProducerOptions{ + // enable batching + p1, err := client.CreateProducer(ProducerOptions{ Topic: topicName, CompressionType: LZ4, }) assert.Nil(t, err) - defer producer.Close() + defer p1.Close() + + // disable batching + p2, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + CompressionType: LZ4, + DisableBatching: true, + }) + assert.Nil(t, err) + defer p2.Close() consumer, err := client.Subscribe(ConsumerOptions{ Topic: topicName, @@ -821,8 +831,16 @@ func TestConsumerCompression(t *testing.T) { const N = 100 for i := 0; i < N; i++ { - if _, err := producer.Send(ctx, &ProducerMessage{ - Payload: []byte(fmt.Sprintf("msg-content-%d", i)), + if _, err := p1.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("msg-content-%d-batching-enabled", i)), + }); err != nil { + t.Fatal(err) + } + } + + for i := 0; i < N; i++ { + if _, err := p2.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("msg-content-%d-batching-disabled", i)), }); err != nil { t.Fatal(err) } @@ -831,7 +849,14 @@ func TestConsumerCompression(t *testing.T) { for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) assert.Nil(t, err) - assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) + assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-enabled", i), string(msg.Payload())) + consumer.Ack(msg) + } + + for i := 0; i < N; i++ { + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-disabled", i), string(msg.Payload())) consumer.Ack(msg) } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index fc564cbb89..a08ee0432b 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -574,6 +574,12 @@ func (p *partitionProducer) internalSend(request *sendRequest) { compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload) compressedSize = len(compressedPayload) checkSize = compressedSize + + // set the compress type in msgMetaData + compressionType := pb.CompressionType(p.options.CompressionType) + if compressionType != pb.CompressionType_NONE { + mm.Compression = &compressionType + } } else { // final check for batching message is in serializeMessage // this is a double check