-
Notifications
You must be signed in to change notification settings - Fork 2
/
publish.go
154 lines (129 loc) · 3.59 KB
/
publish.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package packets
import (
"fmt"
"io"
"sync"
)
var _publishPacketPool = sync.Pool{
New: func() interface{} {
return &PublishPacket{FixedHeader: &FixedHeader{MessageType: Publish}}
},
}
// PublishPacket is an internal representation of the fields of the
// Publish MQTT packet
type PublishPacket struct {
*FixedHeader
TopicName string
MessageID uint16
Payload []byte
TraceID string
}
// NewPublishPacket return the ping request packet
func NewPublishPacket() *PublishPacket {
return _publishPacketPool.Get().(*PublishPacket)
}
// SetFixedHeader will set fh for our header
func (p *PublishPacket) SetFixedHeader(fh *FixedHeader) {
p.FixedHeader = fh
}
// SetTraceID will set traceid for tracing
func (p *PublishPacket) SetTraceID(id string) { p.TraceID = id }
// Verify packet availability
func (p *PublishPacket) Verify() bool { return true }
// Type return the packet type
func (p *PublishPacket) Type() byte {
return p.FixedHeader.MessageType
}
// Reset will initialize the fields in control packet
func (p *PublishPacket) Reset() {
p.FixedHeader.Dup = false
p.FixedHeader.QoS = byte(0)
p.FixedHeader.RemainingLength = 0
p.FixedHeader.Retain = false
p.TopicName = ""
p.MessageID = 0
p.Payload = []byte{}
}
// Close reset the packet field put the control packet back to pool
func (p *PublishPacket) Close() {
p.Reset()
_publishPacketPool.Put(p)
}
// String export the packet of publish information
func (p *PublishPacket) String() string {
return fmt.Sprintf("%s topicName: %s MessageID: %d payload: %s traceID: %s", p.FixedHeader,
p.TopicName, p.MessageID, string(p.Payload), p.TraceID)
}
// Write will write the packets mostly into a net.Conn
func (p *PublishPacket) Write(w io.Writer) (n int, err error) {
b := Getbuf()
defer Putbuf(b)
n = len(p.TopicName) + 2
pb := make([]byte, n+2)
if err = encodeString(p.TopicName, pb[:n]); err != nil {
return 0, err
}
if p.QoS > 0 {
if err = encodeUint16(p.MessageID, pb[n:]); err != nil {
return 0, err
}
n += 2
}
p.FixedHeader.RemainingLength = n + len(p.Payload)
m := p.FixedHeader.pack(b.b[:5])
if m, err = w.Write(b.b[5-m : 5]); err != nil {
return m, err
}
if n, err = w.Write(pb[:n]); err != nil {
return n + m, err
}
// TODO FIXME XXX should be bufferd?
// should this be in one Write?
z, err := w.Write(p.Payload)
return n + m + z, err
}
// Unpack decodes the details of a ControlPacket after the fixed
// header has been read
func (p *PublishPacket) Unpack(b []byte) error {
var (
n int
err error
)
var payloadLength = p.FixedHeader.RemainingLength
p.TopicName, n, err = decodeString(b)
if err != nil {
return err
}
if p.QoS > 0 {
p.MessageID, err = decodeUint16(b[n:])
n += 2
payloadLength -= len(p.TopicName) + 4
} else {
payloadLength -= len(p.TopicName) + 2
}
if err != nil {
return err
}
if payloadLength < 0 {
return fmt.Errorf("Error upacking publish, payload length < 0")
}
p.Payload = make([]byte, payloadLength)
copy(p.Payload, b[n:])
return nil
}
// Copy creates a new PublishPacket with the same topic and payload
// but an empty fixed header, useful for when you want to deliver
// a message with different properties such as QoS but the same
// content
// XXX need to check if put back a packet will cause noting wrong
func (p *PublishPacket) Copy() *PublishPacket {
newP := NewControlPacket(Publish).(*PublishPacket)
newP.TopicName = p.TopicName
newP.Payload = p.Payload
return newP
}
// Details returns a Details struct containing the QoS and
// MessageID of this ControlPacket
func (p *PublishPacket) Details() Details {
return Details{QoS: p.QoS, MessageID: p.MessageID}
}