/
envoy_metrics_server.go
57 lines (47 loc) · 1.39 KB
/
envoy_metrics_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package agent
import (
"context"
"io"
"net"
"google.golang.org/grpc"
envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v3"
"github.com/datawire/dlib/dhttp"
"github.com/datawire/dlib/dlog"
)
type StreamHandler func(ctx context.Context, in *envoyMetrics.StreamMetricsMessage)
type metricsServer struct {
envoyMetrics.MetricsServiceServer
handler StreamHandler
}
// NewMetricsServer is the main metricsServer constructor.
func NewMetricsServer(handler StreamHandler) *metricsServer {
return &metricsServer{
handler: handler,
}
}
// StartServer will start the metrics gRPC server, listening on :8080
// It is a blocking call until sc.ListenAndServe returns.
func (s *metricsServer) Serve(ctx context.Context, listener net.Listener) error {
grpcServer := grpc.NewServer()
envoyMetrics.RegisterMetricsServiceServer(grpcServer, s)
sc := &dhttp.ServerConfig{
Handler: grpcServer,
}
return sc.Serve(ctx, listener)
}
// StreamMetrics implements the StreamMetrics rpc call by calling the stream handler on each
// message received. It's invoked whenever metrics arrive from Envoy.
func (s *metricsServer) StreamMetrics(stream envoyMetrics.MetricsService_StreamMetricsServer) error {
ctx := stream.Context()
dlog.Debug(ctx, "started stream")
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
s.handler(ctx, in)
}
}