Skip to content

Commit

Permalink
feat: add rdvp metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Guilhem Fanton <guilhem.fanton@gmail.com>
  • Loading branch information
gfanton committed Oct 14, 2020
1 parent 3b12fcf commit a86144f
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 20 deletions.
1 change: 1 addition & 0 deletions go/cmd/berty/mini.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func miniCommand() *ffcli.Command {
fsBuilder := func() (*flag.FlagSet, error) {
fs := flag.NewFlagSet("berty mini", flag.ExitOnError)
fs.StringVar(&groupFlag, "mini.group", groupFlag, "group to join, leave empty to create a new group")
manager.SetupMetricsFlags(fs) // add flags to enable metrics
manager.SetupLocalMessengerServerFlags(fs) // add flags to allow creating a full node in the same process
manager.SetupEmptyGRPCListenersFlags(fs) // by default, we don't want to expose gRPC server for mini
manager.SetupRemoteNodeFlags(fs) // mini can be run against an already running server
Expand Down
73 changes: 62 additions & 11 deletions go/cmd/rdvp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"log"
mrand "math/rand"
"net"
"net/http"
"os"
"strings"

Expand All @@ -16,6 +18,7 @@ import (
libp2p_cicuit "github.com/libp2p/go-libp2p-circuit"
libp2p_ci "github.com/libp2p/go-libp2p-core/crypto"
libp2p_host "github.com/libp2p/go-libp2p-core/host"
metrics "github.com/libp2p/go-libp2p-core/metrics"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
libp2p_quic "github.com/libp2p/go-libp2p-quic-transport"
libp2p_rp "github.com/libp2p/go-libp2p-rendezvous"
Expand All @@ -25,6 +28,8 @@ import (
"github.com/oklog/run"
ff "github.com/peterbourgon/ff/v3"
"github.com/peterbourgon/ff/v3/ffcli"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"moul.io/srand"
Expand All @@ -39,15 +44,16 @@ func main() {

// opts
var (
logFormat = "color" // json, console, color, light-console, light-color
logToFile = "stderr" // can be stdout, stderr or a file path
logFilters = "info+:*" // info and more for everythign
serveURN = ":memory:"
serveListeners = "/ip4/0.0.0.0/tcp/4040,/ip4/0.0.0.0/udp/4141/quic"
servePK = ""
serveAnnounce = ""
genkeyType = "Ed25519"
genkeyLength = 2048
logFormat = "color" // json, console, color, light-console, light-color
logToFile = "stderr" // can be stdout, stderr or a file path
logFilters = "info+:*" // info and more for everythign
serveURN = ":memory:"
serveListeners = "/ip4/0.0.0.0/tcp/4040,/ip4/0.0.0.0/udp/4141/quic"
servePK = ""
serveAnnounce = ""
serveMetricsListeners = ""
genkeyType = "Ed25519"
genkeyLength = 2048
)

// parse opts
Expand All @@ -63,6 +69,7 @@ func main() {
serveFlags.StringVar(&serveListeners, "l", serveListeners, "lists of listeners of (m)addrs separate by a comma")
serveFlags.StringVar(&servePK, "pk", servePK, "private key (generated by `rdvp genkey`)")
serveFlags.StringVar(&serveAnnounce, "announce", serveAnnounce, "addrs that will be announce by this server")
serveFlags.StringVar(&serveMetricsListeners, "metrics", serveMetricsListeners, "metrics listener, if empty will disable metrics")
genkeyFlags.StringVar(&genkeyType, "type", genkeyType, "Type of the private key generated, one of : Ed25519, ECDSA, Secp256k1, RSA")
genkeyFlags.IntVar(&genkeyLength, "length", genkeyLength, "The length (in bits) of the key generated.")

Expand All @@ -87,6 +94,14 @@ func main() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var gServe run.Group
gServe.Add(func() error {
<-ctx.Done()
return ctx.Err()
}, func(error) {
cancel()
})

laddrs := strings.Split(serveListeners, ",")
listeners, err := ipfsutil.ParseAddrs(laddrs...)
if err != nil {
Expand Down Expand Up @@ -123,6 +138,8 @@ func main() {
addrsFactory = func([]ma.Multiaddr) []ma.Multiaddr { return announces }
}

reporter := metrics.NewBandwidthCounter()

// init p2p host
host, err := libp2p.New(ctx,
// default tpt + quic
Expand All @@ -142,10 +159,14 @@ func main() {

// announce
libp2p.AddrsFactory(addrsFactory),

// metrics
libp2p.BandwidthReporter(reporter),
)
if err != nil {
return errcode.TODO.Wrap(err)
}

defer host.Close()
logHostInfo(logger, host)

Expand All @@ -159,8 +180,38 @@ func main() {
// start service
_ = libp2p_rp.NewRendezvousService(host, db)

<-ctx.Done()
if err = ctx.Err(); err != nil {
if serveMetricsListeners != "" {
ml, err := net.Listen("tcp", serveMetricsListeners)
if err != nil {
return errcode.TODO.Wrap(err)
}

registery := prometheus.NewRegistry()
registery.MustRegister(prometheus.NewBuildInfoCollector())
registery.MustRegister(prometheus.NewGoCollector())
registery.MustRegister(ipfsutil.NewHostCollector(host))
registery.MustRegister(ipfsutil.NewBandwidthCollector(reporter))
// @TODO(gfanton): add rdvp specific collector...

handerfor := promhttp.HandlerFor(
registery,
promhttp.HandlerOpts{Registry: registery},
)

mux := http.NewServeMux()
gServe.Add(func() error {
mux.Handle("/metrics", handerfor)
logger.Info("metrics listener",
zap.String("handler", "/metrics"),
zap.String("listener", ml.Addr().String()))
return http.Serve(ml, mux)
}, func(error) {
ml.Close()
})

}

if err = gServe.Run(); err != nil {
return errcode.TODO.Wrap(err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion go/internal/initutil/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (m *Manager) getLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode,
return nil, nil, err
}

err = registry.Register(ipfsutil.NewBandwidthCollector(m.Node.Protocol.ipfsNode))
err = registry.Register(ipfsutil.NewBandwidthCollector(m.Node.Protocol.ipfsNode.Reporter))
if err != nil {
return nil, nil, err
}
Expand Down
9 changes: 6 additions & 3 deletions go/internal/initutil/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"go.uber.org/zap"
)

const metricsHandler = "/metrics"

func (m *Manager) SetupMetricsFlags(fs *flag.FlagSet) {
fs.StringVar(&m.Metrics.Listener, "metrics.listener", ":8888", "Metrics listeners")
fs.StringVar(&m.Metrics.Handler, "metrics.handler", "/metrics", "Metrics handler path")
fs.BoolVar(&m.Metrics.Pedantic, "metrics.pedantic", true, "Enable Metrics pedantic")
}

Expand Down Expand Up @@ -55,8 +56,10 @@ func (m *Manager) getMetricsRegistery() (*prometheus.Registry, error) {
promhttp.HandlerOpts{Registry: m.Metrics.Registery},
)

mux.Handle(m.Metrics.Handler, handerfor)
logger.Info("metrics listener", zap.String("handler", m.Metrics.Handler), zap.String("listener", l.Addr().String()))
mux.Handle(metricsHandler, handerfor)
logger.Info("metrics listener",
zap.String("handler", metricsHandler),
zap.String("listener", l.Addr().String()))
return http.Serve(l, mux)
}, func(error) {
l.Close()
Expand Down
10 changes: 5 additions & 5 deletions go/internal/ipfsutil/collector_bandwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ipfsutil
import (
"regexp"

ipfs_core "github.com/ipfs/go-ipfs/core"
metrics "github.com/libp2p/go-libp2p-core/metrics"
prometheus "github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -26,15 +26,15 @@ var (
var _ prometheus.Collector = (*BandwidthCollector)(nil)

type BandwidthCollector struct {
node *ipfs_core.IpfsNode
reporter *metrics.BandwidthCounter
}

func NewBandwidthCollector(node *ipfs_core.IpfsNode) *BandwidthCollector {
return &BandwidthCollector{node}
func NewBandwidthCollector(reporter *metrics.BandwidthCounter) *BandwidthCollector {
return &BandwidthCollector{reporter}
}

func (bc *BandwidthCollector) Collect(cmetric chan<- prometheus.Metric) {
for p, s := range bc.node.Reporter.GetBandwidthByProtocol() {
for p, s := range bc.reporter.GetBandwidthByProtocol() {
if p == "" {
continue
}
Expand Down

0 comments on commit a86144f

Please sign in to comment.