Skip to content

Commit

Permalink
Add properties filed for batch (#683)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolongran <xiaolongran@tencent.com>

### Motivation

Currently, when we disable batch in Producer, in `handleSend()` of `serverCnx.java`, the `msgMetadata.hasNumMessagesInBatch()` is **true** and `msgMetadata.getNumMessagesInBatch()` is **1**.

At this point, if we get the Properties object we set on the producer side on the broker side, the display is empty.

Go SDK set Properties:

```

// disable batch
producer, err := client.CreateProducer(pulsar.ProducerOptions{
	Topic: "topic-1",
	DisableBatching: true,
})

// set properties for every message
producer.Send(ctx, &pulsar.ProducerMessage{
	Payload: []byte(fmt.Sprintf("hello-%d", i)),
	Properties: map[string]string{
		"key-1": "value-1",
	},
});
```

Broker get message properties from entry metadata is null:

```
ByteBuf metadataAndPayload = entry.getDataBuffer();

MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
```

And `msgMetadata.getPropertiesCount() <= 0`.


### Modifications

Add properties filed in Add single message to batchContainer
  • Loading branch information
wolfstudy committed Dec 10, 2021
1 parent efb1025 commit d0d5d0a
Showing 1 changed file with 2 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (bc *batchContainer) Add(
bc.msgMetadata.ProducerName = &bc.producerName
bc.msgMetadata.ReplicateTo = replicateTo
bc.msgMetadata.PartitionKey = metadata.PartitionKey
bc.msgMetadata.Properties = metadata.Properties

if deliverAt.UnixNano() > 0 {
bc.msgMetadata.DeliverAtTime = proto.Int64(int64(TimestampMillis(deliverAt)))
Expand All @@ -211,6 +212,7 @@ func (bc *batchContainer) reset() {
bc.callbacks = []interface{}{}
bc.msgMetadata.ReplicateTo = nil
bc.msgMetadata.DeliverAtTime = nil
bc.msgMetadata.Properties = nil
}

// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
Expand Down

0 comments on commit d0d5d0a

Please sign in to comment.