-
Notifications
You must be signed in to change notification settings - Fork 25
/
sender_fetcher.go
83 lines (67 loc) · 2.13 KB
/
sender_fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package v2
import (
"context"
"fmt"
"io"
"log"
metrics "code.cloudfoundry.org/go-metric-registry"
"code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2"
"google.golang.org/grpc"
)
type MetricClient interface {
NewGauge(name, helpText string, opts ...metrics.MetricOption) metrics.Gauge
}
type SenderFetcher struct {
opts []grpc.DialOption
dopplerConnections func(float64)
dopplerV2Streams func(float64)
}
func NewSenderFetcher(mc MetricClient, opts ...grpc.DialOption) *SenderFetcher {
dopplerV2Streams := mc.NewGauge(
"doppler_v2_streams",
"Current number of established gRPC streams from v2 agent.",
metrics.WithMetricLabels(map[string]string{"metric_version": "2.0"}),
)
dopplerConnections := mc.NewGauge(
"doppler_connections",
"Current number of gRPC connections from v1 and v2 agents.",
metrics.WithMetricLabels(map[string]string{"metric_version": "2.0"}),
)
fetcher := SenderFetcher{
opts: opts,
dopplerConnections: func(i float64) { dopplerConnections.Add(i) },
dopplerV2Streams: func(i float64) { dopplerV2Streams.Add(i) },
}
return &fetcher
}
func (p *SenderFetcher) Fetch(addr string) (io.Closer, loggregator_v2.Ingress_BatchSenderClient, error) {
conn, err := grpc.NewClient(addr, p.opts...)
if err != nil {
return nil, nil, fmt.Errorf("error dialing ingestor stream to %s: %s", addr, err)
}
client := loggregator_v2.NewIngressClient(conn)
sender, err := client.BatchSender(context.Background())
if err != nil {
conn.Close()
return nil, nil, fmt.Errorf("failed to establish stream to doppler (%s): %s", addr, err)
}
p.dopplerConnections(1)
p.dopplerV2Streams(1)
log.Printf("successfully established a stream to doppler %s", addr)
closer := &decrementingCloser{
closer: conn,
dopplerConnections: p.dopplerConnections,
dopplerV2Streams: p.dopplerV2Streams,
}
return closer, sender, err
}
type decrementingCloser struct {
closer io.Closer
dopplerConnections func(float64)
dopplerV2Streams func(float64)
}
func (d *decrementingCloser) Close() error {
d.dopplerConnections(-1)
d.dopplerV2Streams(-1)
return d.closer.Close()
}