/
main.go
59 lines (46 loc) · 1.31 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package main
import (
"crypto/tls"
"flag"
"fmt"
"os"
"github.com/cloudfoundry/noaa/consumer"
"github.com/cloudfoundry/sonde-go/events"
)
const firehoseSubscriptionId = "firehose-a"
var (
dopplerAddress = os.Getenv("DOPPLER_ADDR")
authToken = os.Getenv("CF_ACCESS_TOKEN")
)
func main() {
filterType := flag.String("filter", "all", "filter messages by 'logs' or 'metrics' (default: all)")
flag.Parse()
cnsmr := consumer.New(dopplerAddress, &tls.Config{InsecureSkipVerify: true}, nil)
cnsmr.SetDebugPrinter(ConsoleDebugPrinter{})
fmt.Println("===== Streaming Firehose (will only succeed if you have admin credentials)")
var (
msgChan <-chan *events.Envelope
errorChan <-chan error
)
switch *filterType {
case "logs":
msgChan, errorChan = cnsmr.FilteredFirehose(firehoseSubscriptionId, authToken, consumer.LogMessages)
case "metrics":
msgChan, errorChan = cnsmr.FilteredFirehose(firehoseSubscriptionId, authToken, consumer.Metrics)
default:
msgChan, errorChan = cnsmr.Firehose(firehoseSubscriptionId, authToken)
}
go func() {
for err := range errorChan {
fmt.Fprintf(os.Stderr, "%v\n", err.Error())
}
}()
for msg := range msgChan {
fmt.Printf("%v \n", msg)
}
}
type ConsoleDebugPrinter struct{}
func (c ConsoleDebugPrinter) Print(title, dump string) {
println(title)
println(dump)
}