-
Notifications
You must be signed in to change notification settings - Fork 1
/
nxos-grpc.go
95 lines (85 loc) · 2.65 KB
/
nxos-grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package sink
import (
"fmt"
"io"
"net"
"github.com/agalue/gominion/api"
"github.com/agalue/gominion/log"
"github.com/agalue/gominion/protobuf/mdt_dialout"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)
// NxosGrpcModule represents the Cisco Nexus NX-OS Telemetry module via gRPC
type NxosGrpcModule struct {
mdt_dialout.UnimplementedGRPCMdtDialoutServer
sink api.Sink
config *api.MinionConfig
server *grpc.Server
port int
}
// GetID gets the ID of the sink module
func (module *NxosGrpcModule) GetID() string {
return "NXOS"
}
// Start initiates a gRPC Server for NX-OS telemetry
func (module *NxosGrpcModule) Start(config *api.MinionConfig, sink api.Sink) error {
listener := config.GetListenerByParser("NxosGrpcParser")
if listener == nil || listener.Port == 0 {
log.Warnf("NX-OS Telemetry Module disabled")
return nil
}
module.config = config
module.sink = sink
module.port = listener.Port
module.server = grpc.NewServer()
mdt_dialout.RegisterGRPCMdtDialoutServer(module.server, module)
log.Infof("Starting NX-OS telemetry gRPC server on port %d", listener.Port)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", listener.Port))
if err != nil {
return fmt.Errorf("Error cannot start TCP listener: %s", err)
}
go func() {
if err := module.server.Serve(lis); err != nil {
log.Errorf("Cannot serve NX-OS gRPC: %v", err)
}
}()
return nil
}
// Stop shutdowns the sink module
func (module *NxosGrpcModule) Stop() {
log.Warnf("Stopping NX-OS telemetry gRPC server")
if module.server != nil {
module.server.Stop()
}
}
// MdtDialout implements Cisco NX-OS streaming telemetry service
func (module *NxosGrpcModule) MdtDialout(stream mdt_dialout.GRPCMdtDialout_MdtDialoutServer) error {
ipaddr := "127.0.0.1"
peer, peerOK := peer.FromContext(stream.Context())
if peerOK {
log.Debugf("Accepted Cisco MDT GRPC dialout connection from %s", peer.Addr)
ipaddr = peer.Addr.String()
}
for {
dialoutArgs, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Errorf("Dialout receive error from client %s: %s", ipaddr, dialoutArgs.Errors)
return err
}
if len(dialoutArgs.Data) == 0 && len(dialoutArgs.Errors) != 0 {
log.Errorf("Dialout error from client %s: %s", ipaddr, dialoutArgs.Errors)
break
}
log.Debugf("Received request with ID %d of %d bytes from %s", dialoutArgs.ReqId, len(dialoutArgs.Data), ipaddr)
messages := make([][]byte, 1)
messages[0] = dialoutArgs.Data
if bytes := wrapMessageToTelemetry(module.config, ipaddr, uint32(module.port), messages); bytes != nil {
sendBytes(module.GetID(), module.config, module.sink, bytes)
}
}
log.Warnf("Terminating NX-OS handler")
return nil
}