/
main.go
127 lines (111 loc) · 3.74 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright (c) 2017 Arista Networks, Inc.
// Use of this source code is governed by the Apache License 2.0
// that can be found in the COPYING file.
package main
import (
"context"
"crypto/tls"
"flag"
"fmt"
"net/http"
"os"
"strings"
"time"
"github.com/aristanetworks/goarista/gnmi"
"github.com/aristanetworks/glog"
hec "github.com/aristanetworks/splunk-hec-go"
pb "github.com/openconfig/gnmi/proto/gnmi"
"golang.org/x/sync/errgroup"
)
func exitWithError(s string) {
fmt.Fprintln(os.Stderr, s)
os.Exit(1)
}
func main() {
// gNMI options
cfg := &gnmi.Config{}
flag.StringVar(&cfg.Addr, "addr", "localhost", "gNMI gRPC server `address`")
flag.StringVar(&cfg.CAFile, "cafile", "", "Path to server TLS certificate file")
flag.StringVar(&cfg.CertFile, "certfile", "", "Path to client TLS certificate file")
flag.StringVar(&cfg.KeyFile, "keyfile", "", "Path to client TLS private key file")
flag.StringVar(&cfg.Username, "username", "", "Username to authenticate with")
flag.StringVar(&cfg.Password, "password", "", "Password to authenticate with")
flag.BoolVar(&cfg.TLS, "tls", false, "Enable TLS")
flag.StringVar(&cfg.TLSMinVersion, "tls-min-version", "",
fmt.Sprintf("Set minimum TLS version for connection (%s)", gnmi.TLSVersions))
flag.StringVar(&cfg.TLSMaxVersion, "tls-max-version", "",
fmt.Sprintf("Set maximum TLS version for connection (%s)", gnmi.TLSVersions))
subscribePaths := flag.String("paths", "/", "Comma-separated list of paths to subscribe to")
// Splunk options
splunkURLs := flag.String("splunkurls", "https://localhost:8088",
"Comma-separated list of URLs of the Splunk servers")
splunkToken := flag.String("splunktoken", "", "Token to connect to the Splunk servers")
splunkIndex := flag.String("splunkindex", "", "Index for the data in Splunk")
flag.Parse()
// gNMI connection
ctx := gnmi.NewContext(context.Background(), cfg)
// Store the address without the port so it can be used as the host in the Splunk event.
addr := cfg.Addr
client, err := gnmi.Dial(cfg)
if err != nil {
glog.Fatal(err)
}
// Splunk connection
urls := strings.Split(*splunkURLs, ",")
cluster := hec.NewCluster(urls, *splunkToken)
cluster.SetHTTPClient(&http.Client{
Transport: &http.Transport{
// TODO: add flags for TLS
TLSClientConfig: &tls.Config{
// TODO: add flag to enable TLS
InsecureSkipVerify: true,
},
},
})
// gNMI subscription
respChan := make(chan *pb.SubscribeResponse)
paths := strings.Split(*subscribePaths, ",")
subscribeOptions := &gnmi.SubscribeOptions{
Mode: "stream",
StreamMode: "target_defined",
Paths: gnmi.SplitPaths(paths),
}
var g errgroup.Group
g.Go(func() error { return gnmi.SubscribeErr(ctx, client, subscribeOptions, respChan) })
// Forward subscribe responses to Splunk
for resp := range respChan {
// We got a subscribe response
response := resp.GetResponse()
update, ok := response.(*pb.SubscribeResponse_Update)
if !ok {
continue
}
// Convert the response into a map[string]interface{}
notification, err := gnmi.NotificationToMap(update.Update)
if err != nil {
exitWithError(err.Error())
}
// Build the Splunk event
path := notification["path"].(string)
delete(notification, "path")
timestamp := notification["timestamp"].(int64)
delete(notification, "timestamp")
// Should this be configurable?
sourceType := "openconfig"
event := &hec.Event{
Host: &addr,
Index: splunkIndex,
Source: &path,
SourceType: &sourceType,
Event: notification,
}
event.SetTime(time.Unix(timestamp/1e9, timestamp%1e9))
// Write the event to Splunk
if err := cluster.WriteEvent(event); err != nil {
exitWithError("failed to write event: " + err.Error())
}
}
if err := g.Wait(); err != nil {
exitWithError(err.Error())
}
}