Skip to content

Commit

Permalink
Merge pull request #679 from trxo/master
Browse files Browse the repository at this point in the history
[ISSUE #818] fix msg.Body compressed cause error in transaction message processing
  • Loading branch information
Git-Yang committed Apr 25, 2022
2 parents 40242bc + 5cef32e commit 853f304
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
13 changes: 7 additions & 6 deletions primitive/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ const (
)

type Message struct {
Topic string
Body []byte
Flag int32
TransactionId string
Batch bool
Compress bool
Topic string
Body []byte
CompressedBody []byte
Flag int32
TransactionId string
Batch bool
Compress bool
// Queue is the queue that messages will be sent to. the value must be set if want to custom the queue of message,
// just ignore if not.
Queue *MessageQueue
Expand Down
14 changes: 10 additions & 4 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (p *defaultProducer) tryCompressMsg(msg *primitive.Message) bool {
if e != nil {
return false
}
msg.Body = compressedBody
msg.CompressedBody = compressedBody
msg.Compress = true
return true
}
Expand All @@ -345,8 +345,14 @@ func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
if !msg.Batch && msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) == "" {
msg.WithProperty(primitive.PropertyUniqueClientMessageIdKeyIndex, primitive.CreateUniqID())
}
sysFlag := 0

var (
sysFlag = 0
transferBody = msg.Body
)

if p.tryCompressMsg(msg) {
transferBody = msg.CompressedBody
sysFlag = primitive.SetCompressedFlag(sysFlag)
}
v := msg.GetProperty(primitive.PropertyTransactionPrepared)
Expand All @@ -373,10 +379,10 @@ func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
if msg.Batch {
cmd = internal.ReqSendBatchMessage
reqv2 := &internal.SendMessageRequestV2Header{SendMessageRequestHeader: req}
return remote.NewRemotingCommand(cmd, reqv2, msg.Body)
return remote.NewRemotingCommand(cmd, reqv2, transferBody)
}

return remote.NewRemotingCommand(cmd, req, msg.Body)
return remote.NewRemotingCommand(cmd, req, transferBody)
}

func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.MessageQueue {
Expand Down

0 comments on commit 853f304

Please sign in to comment.