Skip to content

Commit

Permalink
Add incoming/outgoing connections metrics (#959)
Browse files Browse the repository at this point in the history
* Add incoming/outgoing connections metrics

* Move IncomingConnectionsStatsHandler to the net package

* Set IncomingConnectionTimestamp properly and rename GroupConnections to OutgoingConnections

* Add finer-grained outgoing connection tracking

* Replace OutgoingConnectionTimestamp with OutgoingConnectionState

* Fix logic when emitting the outgoing connections metric

* Bump CI
  • Loading branch information
mcamou committed Apr 8, 2022
1 parent 3c0760b commit 185a337
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 30 deletions.
65 changes: 38 additions & 27 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ var (
Name: "dial_failures",
Help: "Number of times there have been network connection issues",
}, []string{"peer_address"})
// GroupConnections (Group) how many GrpcClient connections are present
GroupConnections = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "group_connections",
Help: "Number of peers with current GrpcClient connections",
// OutgoingConnections (Group) how many GrpcClient connections are present
OutgoingConnections = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "outgoing_group_connections",
Help: "Number of peers with current outgoing GrpcClient connections",
})
GroupSize = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "group_size",
Expand Down Expand Up @@ -174,33 +174,51 @@ var (
[]string{"url"},
)

// dkgStateChangeTimestamp tracks DKG status changes
// dkgStateChangeTimestamp (Group) tracks DKG status changes
dkgStateChangeTimestamp = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "dkg_state_change_timestamp",
Help: "DKG state change timestamp in seconds since the Epoch",
ConstLabels: map[string]string{},
Name: "dkg_state_change_timestamp",
Help: "DKG state change timestamp in seconds since the Epoch",
}, []string{"dkg_state", "beacon_id", "is_leader"})

// reshareStateChangeTimestamp tracks reshare status changes
// reshareStateChangeTimestamp (Group) tracks reshare status changes
reshareStateChangeTimestamp = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "reshare_state_change_timestamp",
Help: "Reshare state change timestamp in seconds since the Epoch",
ConstLabels: map[string]string{},
Name: "reshare_state_change_timestamp",
Help: "Reshare state change timestamp in seconds since the Epoch",
}, []string{"reshare_state", "beacon_id", "is_leader"})

// drandBuildTime emits the timestamp when the binary was built in Unix time.
// drandBuildTime (Group) emits the timestamp when the binary was built in Unix time.
drandBuildTime = prometheus.NewUntypedFunc(prometheus.UntypedOpts{
Name: "drand_build_time",
Help: "Timestamp when the binary was built in seconds since the Epoch",
ConstLabels: map[string]string{"build": common.COMMIT, "version": common.GetAppVersion().String()},
}, func() float64 { return float64(getBuildTimestamp(common.BUILDDATE)) })

// IsDrandNode is 1 for drand nodes, 0 for relays
// IsDrandNode (Group) is 1 for drand nodes, 0 for relays
IsDrandNode = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "is_drand_node",
Help: "1 for drand nodes, not emitted for relays",
})

// IncomingConnectionTimestamp (Group) timestamp when each incoming connection was established
// We cannot track the actual connection state as with outgoing connections, since grpc-go
// doesn't allow for adding a listener for state tracking.
IncomingConnectionTimestamp = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "incoming_connection_timestamp",
Help: "timestamp when an incoming connection was established",
}, []string{"remote_host"})

// OutgoingConnectionState (Group) tracks the state of an outgoing connection, according to
// https://github.com/grpc/grpc-go/blob/master/connectivity/connectivity.go#L51
// Due to the fact that grpc-go doesn't support adding a listener for state tracking, this is
// emitted only when getting a connection to the remote host. This means that:
// * If a non-PL host is unable to connect to a PL host, the metric will not be sent to InfluxDB
// * The state might not be up to date (e.g. the remote host is disconnected but we haven't
// tried to connect to it)
OutgoingConnectionState = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "outgoing_connection_state",
Help: "State of an outgoing connection. 0=Idle, 1=Connecting, 2=Ready, 3=Transient Failure, 4=Shutdown",
}, []string{"remote_host"})

metricsBound = false
)

Expand All @@ -218,27 +236,20 @@ func bindMetrics() error {
return err
}

// Private metrics
private := []prometheus.Collector{
drandBuildTime,
dkgStateChangeTimestamp,
reshareStateChangeTimestamp,
}
for _, c := range private {
if err := PrivateMetrics.Register(c); err != nil {
return err
}
}

// Group metrics
group := []prometheus.Collector{
APICallCounter,
GroupDialFailures,
GroupConnections,
OutgoingConnections,
GroupSize,
GroupThreshold,
BeaconDiscrepancyLatency,
LastBeaconRound,
drandBuildTime,
dkgStateChangeTimestamp,
reshareStateChangeTimestamp,
IncomingConnectionTimestamp,
OutgoingConnectionState,
}
for _, c := range group {
if err := GroupMetrics.Register(c); err != nil {
Expand Down
20 changes: 18 additions & 2 deletions net/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
httpgrpcserver "github.com/weaveworks/common/httpgrpc/server"
"golang.org/x/net/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
)

Expand Down Expand Up @@ -287,7 +288,14 @@ func (g *grpcClient) conn(p Peer) (*grpc.ClientConn, error) {
g.Lock()
defer g.Unlock()
var err error

c, ok := g.conns[p.Address()]
if ok && c.GetState() == connectivity.Shutdown {
ok = false
delete(g.conns, p.Address())
metrics.OutgoingConnectionState.WithLabelValues(p.Address()).Set(float64(c.GetState()))
}

if !ok {
log.DefaultLogger().Debugw("", "grpc client", "initiating", "to", p.Address(), "tls", p.IsTLS())
if !p.IsTLS() {
Expand All @@ -311,9 +319,17 @@ func (g *grpcClient) conn(p Peer) (*grpc.ClientConn, error) {
metrics.GroupDialFailures.WithLabelValues(p.Address()).Inc()
}
}
g.conns[p.Address()] = c
metrics.GroupConnections.Set(float64(len(g.conns)))
if err == nil {
g.conns[p.Address()] = c
}
}

// Emit the connection state regardless of whether it's a new or an existing connection
if err == nil {
metrics.OutgoingConnectionState.WithLabelValues(p.Address()).Set(float64(c.GetState()))
}

metrics.OutgoingConnections.Set(float64(len(g.conns)))
return c, err
}

Expand Down
3 changes: 2 additions & 1 deletion net/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func NewGRPCListenerForPrivate(

opts = append(opts,
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(grpc_prometheus.StreamServerInterceptor, s.NodeVersionStreamValidator)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(grpc_prometheus.UnaryServerInterceptor, s.NodeVersionValidator)))
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(grpc_prometheus.UnaryServerInterceptor, s.NodeVersionValidator)),
grpc.StatsHandler(IncomingConnectionsStatsHandler))

grpcServer := grpc.NewServer(opts...)

Expand Down
33 changes: 33 additions & 0 deletions net/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package net

import (
"context"

"google.golang.org/grpc/stats"

"github.com/drand/drand/metrics"
)

type incomingConnectionsStatsHandler struct {
stats.Handler
}

func (h incomingConnectionsStatsHandler) TagRPC(ctx context.Context, tagInfo *stats.RPCTagInfo) context.Context {
// no-op
return ctx
}

func (h incomingConnectionsStatsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
// no-op
}

func (h incomingConnectionsStatsHandler) TagConn(ctx context.Context, tagInfo *stats.ConnTagInfo) context.Context {
metrics.IncomingConnectionTimestamp.WithLabelValues(tagInfo.RemoteAddr.String()).SetToCurrentTime()
return ctx
}

func (h incomingConnectionsStatsHandler) HandleConn(ctx context.Context, connStats stats.ConnStats) {
// no-op
}

var IncomingConnectionsStatsHandler = incomingConnectionsStatsHandler{}

0 comments on commit 185a337

Please sign in to comment.