forked from vcabbage/amqp
/
parsing.go
159 lines (141 loc) · 4.05 KB
/
parsing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package frames
import (
"encoding/binary"
"errors"
"fmt"
"math"
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
)
const HeaderSize = 8
// Frame structure:
//
// header (8 bytes)
// 0-3: SIZE (total size, at least 8 bytes for header, uint32)
// 4: DOFF (data offset,at least 2, count of 4 bytes words, uint8)
// 5: TYPE (frame type)
// 0x0: AMQP
// 0x1: SASL
// 6-7: type dependent (channel for AMQP)
// extended header (opt)
// body (opt)
// Header in a structure appropriate for use with binary.Read()
type Header struct {
// size: an unsigned 32-bit integer that MUST contain the total frame size of the frame header,
// extended header, and frame body. The frame is malformed if the size is less than the size of
// the frame header (8 bytes).
Size uint32
// doff: gives the position of the body within the frame. The value of the data offset is an
// unsigned, 8-bit integer specifying a count of 4-byte words. Due to the mandatory 8-byte
// frame header, the frame is malformed if the value is less than 2.
DataOffset uint8
FrameType uint8
Channel uint16
}
// ParseHeader reads the header from r and returns the result.
//
// No validation is done.
func ParseHeader(r *buffer.Buffer) (Header, error) {
buf, ok := r.Next(8)
if !ok {
return Header{}, errors.New("invalid frameHeader")
}
_ = buf[7]
fh := Header{
Size: binary.BigEndian.Uint32(buf[0:4]),
DataOffset: buf[4],
FrameType: buf[5],
Channel: binary.BigEndian.Uint16(buf[6:8]),
}
if fh.Size < HeaderSize {
return fh, fmt.Errorf("received frame header with invalid size %d", fh.Size)
}
if fh.DataOffset < 2 {
return fh, fmt.Errorf("received frame header with invalid data offset %d", fh.DataOffset)
}
return fh, nil
}
// ParseBody reads and unmarshals an AMQP frame.
func ParseBody(r *buffer.Buffer) (FrameBody, error) {
payload := r.Bytes()
if r.Len() < 3 || payload[0] != 0 || encoding.AMQPType(payload[1]) != encoding.TypeCodeSmallUlong {
return nil, errors.New("invalid frame body header")
}
switch pType := encoding.AMQPType(payload[2]); pType {
case encoding.TypeCodeOpen:
t := new(PerformOpen)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeBegin:
t := new(PerformBegin)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeAttach:
t := new(PerformAttach)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeFlow:
t := new(PerformFlow)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeTransfer:
t := new(PerformTransfer)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeDisposition:
t := new(PerformDisposition)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeDetach:
t := new(PerformDetach)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeEnd:
t := new(PerformEnd)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeClose:
t := new(PerformClose)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeSASLMechanism:
t := new(SASLMechanisms)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeSASLChallenge:
t := new(SASLChallenge)
err := t.Unmarshal(r)
return t, err
case encoding.TypeCodeSASLOutcome:
t := new(SASLOutcome)
err := t.Unmarshal(r)
return t, err
default:
return nil, fmt.Errorf("unknown performative type %02x", pType)
}
}
// Write encodes fr into buf.
// split out from conn.WriteFrame for testing purposes.
func Write(buf *buffer.Buffer, fr Frame) error {
// write header
buf.Append([]byte{
0, 0, 0, 0, // size, overwrite later
2, // doff, see frameHeader.DataOffset comment
uint8(fr.Type), // frame type
})
buf.AppendUint16(fr.Channel) // channel
// write AMQP frame body
err := encoding.Marshal(buf, fr.Body)
if err != nil {
return err
}
// validate size
if uint(buf.Len()) > math.MaxUint32 {
return errors.New("frame too large")
}
// retrieve raw bytes
bufBytes := buf.Bytes()
// write correct size
binary.BigEndian.PutUint32(bufBytes, uint32(len(bufBytes)))
return nil
}