-
Notifications
You must be signed in to change notification settings - Fork 1
/
udp.go
76 lines (67 loc) · 1.99 KB
/
udp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package sink
import (
"net"
"github.com/agalue/gominion/api"
"github.com/agalue/gominion/log"
)
// UDPForwardParser represents org.opennms.netmgt.telemetry.protocols.common.parser.ForwardParser
const UDPForwardParser = "ForwardParser"
// UDPForwardModule represents a generic UDP forward module
// It starts a UDP Listener, and forwards the received data to OpenNMS without alteration
type UDPForwardModule struct {
name string
sink api.Sink
config *api.MinionConfig
conn *net.UDPConn
stopping bool
}
// GetID gets the ID of the sink module
func (module *UDPForwardModule) GetID() string {
return module.name
}
// Start initiates a generic UDP receiver
func (module *UDPForwardModule) Start(config *api.MinionConfig, sink api.Sink) error {
listener := config.GetListener(module.name)
if listener == nil || !listener.Is(UDPForwardParser) {
log.Warnf("UDP Module %s disabled", module.name)
return nil
}
var err error
module.stopping = false
module.sink = sink
module.config = config
module.conn, err = createUDPListener(listener.Port)
if err != nil {
return err
}
log.Infof("Starting %s receiver on port UDP %d", module.name, listener.Port)
go func() {
payload := make([]byte, 1024)
for {
size, pktAddr, err := module.conn.ReadFromUDP(payload)
if err != nil {
if !module.stopping {
log.Errorf("%s cannot read from UDP: %s", module.name, err)
}
continue
}
payloadCut := make([]byte, size)
copy(payloadCut, payload[0:size])
log.Debugf("Received %d bytes from %s", size, pktAddr)
messages := make([][]byte, 1)
messages[0] = payloadCut
if bytes := wrapMessageToTelemetry(module.config, pktAddr.IP.String(), uint32(pktAddr.Port), messages); bytes != nil {
sendBytes(module.GetID(), module.config, module.sink, bytes)
}
}
}()
return nil
}
// Stop shutdowns the sink module
func (module *UDPForwardModule) Stop() {
log.Warnf("Stopping %s receiver", module.name)
module.stopping = true
if module.conn != nil {
module.conn.Close()
}
}