/
device.go
240 lines (213 loc) · 6.95 KB
/
device.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
package adapters
import (
"errors"
"fmt"
"os"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/pcap"
"github.com/hyeoncheon/goul"
)
// constants...
const (
defaultDeviceAdapterID = "cap"
defaultSnapLen = 1600
defaultPromiscuous = false
defaultTimeout = 1
defaultFilter = "ip"
ErrDeviceAdapterNotInitialized = "device adapter not initialized"
ErrCouldNotActivate = "could not activate capture interface"
)
// DeviceAdapter is an adapter for the network device interfacing.
// This is the most important adapter of Goul. It is used as a reader
// adapter for the sender and a writer adapter for the receiver.
//
// Please note that the adapter MUST be initialized with NewDevice()
// function so that it can be initialized with initialization of device
// and automatically inherit the BaseAdapter that implements underlying
// CommonMixin. Otherwise, it does not work properly.
type DeviceAdapter struct {
goul.Adapter
ID string
err error
device string
snaplen int
promiscuous bool
timeout time.Duration
filter string
isTest bool
handle *pcap.Handle
inactiveHandle *pcap.InactiveHandle
}
// Read implements interface Adapter
func (a *DeviceAdapter) Read(ctrl chan goul.Item, message goul.Message) (out chan goul.Item, err error) {
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "DeviceAdapter#Write recovered from panic!\n")
fmt.Fprintf(os.Stderr, "Probably an inheritance problem of pipeline instance.\n")
fmt.Fprintf(os.Stderr, "panic: %v\n", r)
err = errors.New("panic")
}
}()
a.err = a.activate()
if a.err != nil {
a.SetError(a.err)
goul.Error(a.GetLogger(), a.ID, "%v: %v", ErrCouldNotActivate, a.err)
return nil, errors.New(ErrCouldNotActivate)
}
goul.Log(a.GetLogger(), a.ID, "setting filter <%v>...", a.filter)
if a.err = a.handle.SetBPFFilter(a.filter); a.err != nil {
a.SetError(a.err)
goul.Error(a.GetLogger(), a.ID, "%v: %v", ErrCouldNotActivate, a.err)
return nil, errors.New(ErrCouldNotActivate)
}
return goul.Launch(a.reader, ctrl, message)
}
// Write implements interface Adapter
func (a *DeviceAdapter) Write(in chan goul.Item, message goul.Message) (done chan goul.Item, err error) {
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "DeviceAdapter#Write recovered from panic!\n")
fmt.Fprintf(os.Stderr, "Probably an inheritance problem of pipeline instance.\n")
fmt.Fprintf(os.Stderr, "panic: %v\n", r)
err = errors.New("panic")
}
}()
var fn goul.PipeFunction
if a.isTest {
fn = a.dummy
//return goul.Launch(a.dummy, in, message)
} else {
fn = a.writer
a.err = a.activate()
if a.err != nil {
a.SetError(a.err)
goul.Error(a.GetLogger(), a.ID, "%v: %v", ErrCouldNotActivate, a.err)
return nil, errors.New(ErrCouldNotActivate)
}
}
return goul.Launch(fn, in, message)
}
// reader read packets from device and push it into output channel.
func (a *DeviceAdapter) reader(in, out chan goul.Item, message goul.Message) {
defer close(out)
defer goul.Log(a.GetLogger(), a.ID, "exit")
// just for dirty timing... it should be run after writer is ready.
//? changing the execution order as reversed?
time.Sleep(500 * time.Millisecond)
packets := gopacket.NewPacketSource(a.handle, a.handle.LinkType()).Packets()
goul.Log(a.GetLogger(), a.ID, "capturing in looping...")
for {
select {
case _, ok := <-in:
if !ok {
goul.Log(a.GetLogger(), a.ID, "channel closed")
return
}
case packet := <-packets:
out <- packet
default: // for non-blocking looping
time.Sleep(10 * time.Millisecond)
}
}
}
// writer write out the packets from input channel
func (a *DeviceAdapter) writer(in, out chan goul.Item, message goul.Message) {
defer close(out)
defer goul.Log(a.GetLogger(), a.ID, "exit")
goul.Log(a.GetLogger(), a.ID, "writer in looping...")
for item := range in {
if p, ok := item.(gopacket.Packet); ok {
a.handle.WritePacketData(p.Data())
}
}
goul.Log(a.GetLogger(), a.ID, "channel closed")
out <- &goul.ItemGeneric{Meta: "message", DATA: []byte("channel closed. done")}
}
// writer write out the packets from input channel
func (a *DeviceAdapter) dummy(in, out chan goul.Item, message goul.Message) {
defer close(out)
defer goul.Log(a.GetLogger(), a.ID, "exit")
goul.Log(a.GetLogger(), a.ID, "dummy writer in looping...")
var count uint64 = 0
for range in {
count++
}
goul.Log(a.GetLogger(), a.ID, "channel closed")
goul.Log(a.GetLogger(), a.ID, "dummy writer got %v packets", count)
out <- &goul.ItemGeneric{Meta: "message", DATA: []byte("channel closed. done")}
}
// NewDevice returns new device adapter.
func NewDevice(dev string, isTest bool) (*DeviceAdapter, error) {
a := &DeviceAdapter{
ID: defaultDeviceAdapterID,
device: dev,
snaplen: defaultSnapLen,
promiscuous: defaultPromiscuous,
timeout: defaultTimeout,
filter: defaultFilter,
Adapter: &goul.BaseAdapter{},
isTest: isTest,
}
if !isTest {
a.inactiveHandle, a.err = pcap.NewInactiveHandle(a.device)
}
return a, a.err
}
// Close clean up resources on device adapter.
func (a *DeviceAdapter) Close() (err error) {
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "DeviceAdapter#Close recovered from panic!\n")
fmt.Fprintf(os.Stderr, "Probably an inheritance problem of pipeline instance.\n")
fmt.Fprintf(os.Stderr, "panic: %v\n", r)
err = errors.New("panic")
}
}()
goul.Log(a.GetLogger(), a.ID, "cleanup...")
if a.handle != nil {
a.handle.Close()
}
if a.inactiveHandle != nil {
a.inactiveHandle.CleanUp()
}
return nil
}
// SetOptions sets capture options to inactive handler.
func (a *DeviceAdapter) SetOptions(promisc bool, snaplength int, timeout time.Duration) (err error) {
if a.inactiveHandle == nil {
a.err = errors.New(ErrDeviceAdapterNotInitialized)
return a.err
}
goul.Log(a.GetLogger(), a.ID, "set timeout/snaplen/promisc: %v/%v/%v", timeout, snaplength, promisc)
if a.err = a.inactiveHandle.SetTimeout(timeout * time.Second); a.err != nil {
goul.Error(a.GetLogger(), a.ID, "set timeout error: %v", a.err)
} else if a.err = a.inactiveHandle.SetSnapLen(snaplength); a.err != nil {
goul.Error(a.GetLogger(), a.ID, "set snaplen error: %v", a.err)
} else if a.err = a.inactiveHandle.SetPromisc(promisc); a.err != nil {
goul.Error(a.GetLogger(), a.ID, "set promisc error: %v", a.err)
}
a.promiscuous = promisc
a.snaplen = snaplength
a.timeout = timeout
return a.err
}
// SetFilter sets filter string which is applied while capturing.
func (a *DeviceAdapter) SetFilter(filter string) error {
a.filter = filter
return nil
}
func (a *DeviceAdapter) activate() error {
if a.inactiveHandle == nil {
a.err = errors.New(ErrDeviceAdapterNotInitialized)
return a.err
}
if a.handle == nil && a.inactiveHandle != nil {
a.handle, a.err = a.inactiveHandle.Activate()
if a.err != nil {
return a.err
}
}
goul.Log(a.GetLogger(), a.ID, "handle initiated: %v", a.handle)
return nil
}