Skip to content

Commit

Permalink
status: allow exporting ConnectivityCheck results over HTTP (#2411)
Browse files Browse the repository at this point in the history
* status: allow exporting ConnectivityCheck results over HTTP

Adds a command that initiates ConnectivityCheck every `interval`
and serves the results in the form of prometheus metrics.
  • Loading branch information
gnarula committed Dec 2, 2020
1 parent 37a01a5 commit 0303996
Showing 1 changed file with 218 additions and 0 deletions.
218 changes: 218 additions & 0 deletions status/status.go
Expand Up @@ -6,12 +6,18 @@ package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"sort"
"strings"
"sync"
"syscall"
"text/template"
"time"

cli "github.com/urfave/cli"
Expand All @@ -22,6 +28,28 @@ import (
"go.dedis.ch/onet/v3/network"
)

const prometheusTemplate = `
# HELP voting_conodes_status voting global conodes status: X is the number of conodes; X >= 0.66 OK, 0 < X < 0.66 KO
# TYPE voting_conodes_status gauge
voting_conodes_status{} {{ .Connectivity }}
# HELP voting_conodes_status_timestamp timestamp in seconds since epoch denoting last probe time
# TYPE voting_conodes_status_timestamp counter
voting_conodes_status_timestamp{} {{ .LastCheckedAt }}
# HELP probe error: {{ .ProbeError }}
# HELP voting_conode_status voting conode status: 1=OK, 0=KO
# TYPE voting_conode_status gauge
{{- range $conode, $status := .Matrix -}}
{{ if eq $conode $.Self }}
voting_conode_status{conode="{{ $conode}}", critical="true"} {{ $status }}
{{ else }}
voting_conode_status{conode="{{ $conode}}", critical="false"} {{ $status }}
{{ end }}
{{- end -}}
`

func main() {
app := cli.NewApp()
app.Name = "Status"
Expand Down Expand Up @@ -68,6 +96,37 @@ func main() {
Value: "1s",
},
},
Subcommands: []cli.Command{
{
Name: "serve",
Usage: "exposes the connectivity check results on HTTP. Assumes -findFaulty\n " +
"Note: The HTTP server does not rate limit connections. Administrators are advised\n " +
"to rate-limit connections or firewall the HTTP port themselves.",
Flags: []cli.Flag{
cli.StringFlag{
Name: "format, fo",
Value: "prometheus",
Usage: "Output format",
},
cli.StringFlag{
Name: "endpoint, e",
Value: "/connectivity",
Usage: "HTTP endpoint to serve the response at",
},
cli.IntFlag{
Name: "port, p",
Value: 9000,
Usage: "Port to listen on",
},
cli.StringFlag{
Name: "interval, i",
Value: "1m",
Usage: "Time interval to run connectivity checks in",
},
},
Action: serve,
},
},
Action: connectivity,
},
}
Expand Down Expand Up @@ -244,3 +303,162 @@ func printJSON(all []se) {
out.Write(b2.Bytes())
out.Flush()
}

type serveResponse struct {
Connectivity float64
Matrix map[string]int
Self string
LastCheckedAt int64
ProbeError string
}

type server struct {
response serveResponse
mu sync.Mutex

si *network.ServerIdentity
list []*network.ServerIdentity
timeout time.Duration
interval time.Duration
quit chan struct{}
}

func serve(c *cli.Context) error {
if c.NArg() != 2 {
return errors.New("please give 2 arguments: group.toml private.toml")
}
ro, err := readGroup(c.Args().First())
if err != nil {
return errors.New("couldn't read file: " + err.Error())
}
log.Lvl3(ro)
list := ro.List
log.Info("List is", list)
to, err := time.ParseDuration(c.Parent().String("timeout"))
if err != nil {
return errors.New("duration parse error: " + err.Error())
}
coth, err := app.LoadCothority(c.Args().Get(1))
if err != nil {
return errors.New("error while loading private.toml: " + err.Error())
}
si, err := coth.GetServerIdentity()
if err != nil {
return errors.New("private.toml didn't have a serverIdentity: " + err.Error())
}

format := c.String("format")
if format != "prometheus" {
return errors.New("unsupported format: " + format)
}

endpoint := c.String("endpoint")
if len(endpoint) == 0 || endpoint[0] != '/' {
return errors.New("invalid endpoint: " + endpoint)
}

port := c.Int("port")
interval, err := time.ParseDuration(c.String("interval"))
if err != nil {
return errors.New("invalid interval: " + err.Error())
}

s := &server{
interval: interval,
timeout: to,
list: list,
si: si,
quit: make(chan struct{}),
}

http.HandleFunc(endpoint, s.serveHandler)
httpServer := &http.Server{Addr: fmt.Sprintf(":%d", port)}

done := s.probe()
go func() {
log.Infof("Starting HTTP server on %d\n", port)
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Errorf("error starting server")
return
}
}()

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

<-signalChan
log.Infof("Shutting down server..")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := httpServer.Shutdown(ctx); err != nil {
log.Errorf("Error shutting down server: %s", err)
}

s.quit <- struct{}{}
<-done
return nil
}

func (s *server) probe() chan struct{} {
done := make(chan struct{})
go func() {
tick := time.Tick(s.interval)
for {
select {
case <-s.quit:
log.Infof("Shutting down prober...")
done <- struct{}{}
return
case <-tick:
log.Infof("Probing servers...")
s.mu.Lock()
s.response = serveResponse{
Matrix: make(map[string]int),
Connectivity: 0,
Self: s.si.Address.String(),
LastCheckedAt: time.Now().Unix(),
}
for _, si := range s.list {
s.response.Matrix[si.String()] = 0
}

resp, err := status.NewClient().CheckConnectivity(
s.si.GetPrivate(),
s.list,
s.timeout,
true,
)

if err != nil {
log.Errorf("error checking connectivity: %s", err)
s.response.ProbeError = err.Error()
}

for _, si := range resp {
s.response.Matrix[si.String()] = 1
}
s.response.Connectivity = float64(len(resp)) / float64(len(s.list))
s.mu.Unlock()
}
}
}()
return done
}

func (s *server) serveHandler(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
defer s.mu.Unlock()

if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

t, err := template.New("prometheus").Parse(prometheusTemplate)
if err != nil {
log.Errorf("error parsing template: %s", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
t.Execute(w, s.response)
}

0 comments on commit 0303996

Please sign in to comment.