-
Notifications
You must be signed in to change notification settings - Fork 3
/
message.go
94 lines (85 loc) · 2.89 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Copyright 2018 Comcast Cable Communications Management, LLC
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package msg
import (
"bytes"
"encoding/binary"
"errors"
"io"
"github.com/golang/protobuf/proto"
"github.com/TuyaInc/pulsar-client-go/pkg/api"
)
// Message represents a received MESSAGE from the Pulsar server.
type Message struct {
Topic string
ConsumerID uint64
Msg *api.CommandMessage
Meta *api.MessageMetadata
Payload []byte
}
// Equal returns true if the provided other Message
// is equal to the receiver Message.
func (m *Message) Equal(other *Message) bool {
return m.ConsumerID == other.ConsumerID &&
proto.Equal(m.Msg, other.Msg) &&
proto.Equal(m.Meta, other.Meta) &&
bytes.Equal(m.Payload, other.Payload)
}
// SingleMessage represents one of the elements of the batch type payload
type SingleMessage struct {
SingleMetaSize uint32
SingleMeta *api.SingleMessageMetadata
SinglePayload []byte
}
// DecodeBatchMessage decode message if num_messages_in_batch exist and bigger than 0
func DecodeBatchMessage(msg *Message) ([]*SingleMessage, error) {
num := msg.Meta.GetNumMessagesInBatch()
if num == 0 || msg.Meta.NumMessagesInBatch == nil {
return nil, errors.New("num_message_in_batch is nil or 0")
}
return DecodeBatchPayload(msg.Payload, num)
}
// DecodeBatchPayload parses the payload of the batch type
// If the producer uses the batch function, msg.Payload will be a SingleMessage array structure.
func DecodeBatchPayload(bp []byte, batchNum int32) ([]*SingleMessage, error) {
buf32 := make([]byte, 4)
rdBuf := bytes.NewReader(bp)
list := make([]*SingleMessage, 0, batchNum)
for i := int32(0); i < batchNum; i++ {
// singleMetaSize
if _, err := io.ReadFull(rdBuf, buf32); err != nil {
return nil, err
}
singleMetaSize := binary.BigEndian.Uint32(buf32)
// singleMeta
singleMetaBuf := make([]byte, singleMetaSize)
if _, err := io.ReadFull(rdBuf, singleMetaBuf); err != nil {
return nil, err
}
singleMeta := new(api.SingleMessageMetadata)
if err := proto.Unmarshal(singleMetaBuf, singleMeta); err != nil {
return nil, err
}
// payload
singlePayload := make([]byte, singleMeta.GetPayloadSize())
if _, err := io.ReadFull(rdBuf, singlePayload); err != nil {
return nil, err
}
d := &SingleMessage{}
d.SingleMetaSize = singleMetaSize
d.SingleMeta = singleMeta
d.SinglePayload = singlePayload
list = append(list, d)
}
return list, nil
}