/
decoder.go
249 lines (209 loc) · 6.01 KB
/
decoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
//: ----------------------------------------------------------------------------
//: Copyright (C) 2017 Verizon. All Rights Reserved.
//: All Rights Reserved
//:
//: file: decoder.go
//: details: TODO
//: author: Mehrdad Arshad Rad
//: date: 02/01/2017
//:
//: Licensed under the Apache License, Version 2.0 (the "License");
//: you may not use this file except in compliance with the License.
//: You may obtain a copy of the License at
//:
//: http://www.apache.org/licenses/LICENSE-2.0
//:
//: Unless required by applicable law or agreed to in writing, software
//: distributed under the License is distributed on an "AS IS" BASIS,
//: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//: See the License for the specific language governing permissions and
//: limitations under the License.
//: ----------------------------------------------------------------------------
package sflow
import (
"encoding/binary"
"errors"
"io"
"net"
"time"
)
const (
// DataFlowSample defines packet flow sampling
DataFlowSample = 1
// DataCounterSample defines counter sampling
DataCounterSample = 2
)
// SFDecoder represents sFlow decoder
type SFDecoder struct {
reader io.ReadSeeker
filter []uint32 // Filter data format(s)
}
// SFDatagram represents sFlow datagram
type SFDatagram struct {
Version uint32 // Datagram version
IPVersion uint32 // Data gram sFlow version
AgentSubID uint32 // Identifies a source of sFlow data
SequenceNo uint32 // Sequence of sFlow Datagrams
SysUpTime uint32 // Current time (in milliseconds since device last booted
SamplesNo uint32 // Number of samples
Samples []Sample
Counters []Counter
IPAddress net.IP // Agent IP address
ColTime int64 // Collected time
}
// SFSampledHeader represents sFlow sample header
type SFSampledHeader struct {
HeaderProtocol uint32 // (enum SFHeaderProtocol)
FrameLength uint32 // Original length of packet before sampling
Stripped uint32 // Header/trailer bytes stripped by sender
HeaderLength uint32 // Length of sampled header bytes to follow
HeaderBytes []byte // Header bytes
}
// Sample represents sFlow sample flow
type Sample interface{}
// Counter represents sFlow counters
type Counter interface{}
// Record represents sFlow sample record record
type Record interface{}
var (
errNoneEnterpriseStandard = errors.New("the enterprise is not standard sflow data")
errDataLengthUnknown = errors.New("the sflow data length is unknown")
errSFVersionNotSupport = errors.New("the sflow version doesn't support")
errSampleLengthInvalid = errors.New("the sflow sample length is invalid")
)
// NewSFDecoder constructs new sflow decoder
func NewSFDecoder(r io.ReadSeeker, f []uint32) SFDecoder {
return SFDecoder{
reader: r,
filter: f,
}
}
// SFDecode decodes sFlow data
func (d *SFDecoder) SFDecode() (*SFDatagram, error) {
datagram, err := d.sfHeaderDecode()
if err != nil {
return nil, err
}
datagram.Samples = []Sample{}
datagram.Counters = []Counter{}
for i := uint32(0); i < datagram.SamplesNo; i++ {
sfTypeFormat, sfDataLength, err := d.getSampleInfo()
if err != nil {
return nil, err
}
if m := d.isFilterMatch(sfTypeFormat); m {
d.reader.Seek(int64(sfDataLength), 1)
continue
}
var offsetBefore, _ = d.reader.Seek(0, 1)
switch sfTypeFormat {
case DataFlowSample:
d, err := decodeFlowSample(d.reader)
if err != nil {
return datagram, err
}
datagram.Samples = append(datagram.Samples, d)
case DataCounterSample:
d, err := decodeFlowCounter(d.reader)
if err != nil {
return datagram, err
}
datagram.Counters = append(datagram.Counters, d)
default:
d.reader.Seek(int64(sfDataLength), 1)
}
err = d.sfMoveToEndOfSample(sfDataLength, offsetBefore)
if err != nil {
return datagram, err
}
}
return datagram, nil
}
func (d *SFDecoder) sfMoveToEndOfSample(sfDataLength uint32, offsetBefore int64) error {
var offsetAfter, _ = d.reader.Seek(0, 1)
var bytesRead = offsetAfter - offsetBefore
var byteUntilEndOfSample = int64(sfDataLength) - bytesRead
if byteUntilEndOfSample < 0 {
return errSampleLengthInvalid
}
if byteUntilEndOfSample > 0 {
var _, err = d.reader.Seek(byteUntilEndOfSample, 1)
if err != nil {
return err
}
}
return nil
}
func (d *SFDecoder) sfHeaderDecode() (*SFDatagram, error) {
var (
datagram = &SFDatagram{}
ipLen = 4
err error
)
if err = read(d.reader, &datagram.Version); err != nil {
return nil, err
}
if datagram.Version != 5 {
return nil, errSFVersionNotSupport
}
if err = read(d.reader, &datagram.IPVersion); err != nil {
return nil, err
}
// read the agent ip address
if datagram.IPVersion == 2 {
ipLen = 16
}
buff := make([]byte, ipLen)
if _, err = d.reader.Read(buff); err != nil {
return nil, err
}
datagram.IPAddress = buff
if err = read(d.reader, &datagram.AgentSubID); err != nil {
return nil, err
}
if err = read(d.reader, &datagram.SequenceNo); err != nil {
return nil, err
}
if err = read(d.reader, &datagram.SysUpTime); err != nil {
return nil, err
}
if err = read(d.reader, &datagram.SamplesNo); err != nil {
return nil, err
}
datagram.ColTime = time.Now().Unix()
return datagram, nil
}
func (d *SFDecoder) getSampleInfo() (uint32, uint32, error) {
var (
sfType uint32
sfTypeFormat uint32
sfTypeEnterprise uint32
sfDataLength uint32
err error
)
if err = read(d.reader, &sfType); err != nil {
return 0, 0, err
}
sfTypeEnterprise = sfType >> 12 // 20 bytes enterprise
sfTypeFormat = sfType & 0xfff // 12 bytes format
// supports standard sflow data
if sfTypeEnterprise != 0 {
d.reader.Seek(int64(sfDataLength), 1)
return 0, 0, errNoneEnterpriseStandard
}
if err = read(d.reader, &sfDataLength); err != nil {
return 0, 0, errDataLengthUnknown
}
return sfTypeFormat, sfDataLength, nil
}
func (d *SFDecoder) isFilterMatch(f uint32) bool {
for _, v := range d.filter {
if v == f {
return true
}
}
return false
}
func read(r io.Reader, v interface{}) error {
return binary.Read(r, binary.BigEndian, v)
}