Skip to content

Commit

Permalink
debug: use the new metrics stream in debug command
Browse files Browse the repository at this point in the history
  • Loading branch information
dnephin committed Jun 14, 2021
1 parent a442e83 commit 460408a
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 20 deletions.
49 changes: 49 additions & 0 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"encoding/json"
"fmt"
"net/http"
"strconv"
Expand Down Expand Up @@ -158,6 +159,54 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request)
return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req)
}

func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
if err != nil {
return nil, err
}
if rule != nil && rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow {
return nil, acl.ErrPermissionDenied
}

flusher, ok := resp.(http.Flusher)
if !ok {
return nil, fmt.Errorf("Streaming not supported")
}

resp.WriteHeader(http.StatusOK)

// 0 byte write is needed before the Flush call so that if we are using
// a gzip stream it will go ahead and write out the HTTP response header
resp.Write([]byte(""))
flusher.Flush()

enc := metricsEncoder{
logger: s.agent.logger,
encoder: json.NewEncoder(resp),
flusher: flusher,
}
s.agent.baseDeps.MetricsHandler.Stream(req.Context(), enc)
return nil, nil
}

type metricsEncoder struct {
logger hclog.Logger
encoder *json.Encoder
flusher http.Flusher
}

func (m metricsEncoder) Encode(summary interface{}) error {
if err := m.encoder.Encode(summary); err != nil {
m.logger.Error("failed to encode metrics summary", "error", err)
return err
}
m.flusher.Flush()
return nil
}

func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
Expand Down
2 changes: 1 addition & 1 deletion agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (s *HTTPHandlers) handler(enableDebug bool) http.Handler {

var gzipHandler http.Handler
minSize := gziphandler.DefaultMinSize
if pattern == "/v1/agent/monitor" {
if pattern == "/v1/agent/monitor" || pattern == "/v1/agent/metrics/stream" {
minSize = 0
}
gzipWrapper, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(minSize))
Expand Down
1 change: 1 addition & 0 deletions agent/http_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func init() {
registerEndpoint("/v1/agent/reload", []string{"PUT"}, (*HTTPHandlers).AgentReload)
registerEndpoint("/v1/agent/monitor", []string{"GET"}, (*HTTPHandlers).AgentMonitor)
registerEndpoint("/v1/agent/metrics", []string{"GET"}, (*HTTPHandlers).AgentMetrics)
registerEndpoint("/v1/agent/metrics/stream", []string{"GET"}, (*HTTPHandlers).AgentMetricsStream)
registerEndpoint("/v1/agent/services", []string{"GET"}, (*HTTPHandlers).AgentServices)
registerEndpoint("/v1/agent/service/", []string{"GET"}, (*HTTPHandlers).AgentService)
registerEndpoint("/v1/agent/checks", []string{"GET"}, (*HTTPHandlers).AgentChecks)
Expand Down
3 changes: 3 additions & 0 deletions agent/setup.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package agent

import (
"context"
"fmt"
"io"
"net"
"net/http"
"sync"
"time"

"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc/grpclog"
Expand Down Expand Up @@ -48,6 +50,7 @@ type BaseDeps struct {
// MetricsHandler provides an http.Handler for displaying metrics.
type MetricsHandler interface {
DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error)
Stream(ctx context.Context, encoder metrics.Encoder)
}

type ConfigLoader func(source config.Source) (config.LoadResult, error)
Expand Down
13 changes: 13 additions & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,19 @@ func (a *Agent) Metrics() (*MetricsInfo, error) {
return out, nil
}

// MetricsStream returns an io.ReadCloser which will emit a stream of metrics
// until the context is cancelled. The metrics are json encoded.
// The caller is responsible for closing the returned io.ReadCloser.
func (a *Agent) MetricsStream(ctx context.Context) (io.ReadCloser, error) {
r := a.c.newRequest("GET", "/v1/agent/metrics/stream")
r.ctx = ctx
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
return resp.Body, nil
}

// Reload triggers a configuration reload for the agent we are connected to.
func (a *Agent) Reload() error {
r := a.c.newRequest("PUT", "/v1/agent/reload")
Expand Down
51 changes: 32 additions & 19 deletions command/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package debug
import (
"archive/tar"
"compress/gzip"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"golang.org/x/sync/errgroup"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"

"golang.org/x/sync/errgroup"

"github.com/hashicorp/go-multierror"
"github.com/mitchellh/cli"

Expand Down Expand Up @@ -386,14 +388,6 @@ func captureShortLived(c *cmd) error {
return c.captureGoRoutines(timestampDir)
})
}

// Capture metrics
if c.configuredTarget("metrics") {
g.Go(func() error {
return c.captureMetrics(timestampDir)
})
}

return g.Wait()
}

Expand All @@ -412,7 +406,6 @@ func (c *cmd) captureLongRunning() error {
timestamp := time.Now().Local().Unix()

timestampDir, err := c.createTimestampDir(timestamp)

if err != nil {
return err
}
Expand All @@ -423,7 +416,6 @@ func (c *cmd) captureLongRunning() error {
if s < 1 {
s = 1
}
// Capture pprof
if c.configuredTarget("pprof") {
g.Go(func() error {
return c.captureProfile(s, timestampDir)
Expand All @@ -433,12 +425,20 @@ func (c *cmd) captureLongRunning() error {
return c.captureTrace(s, timestampDir)
})
}
// Capture logs
if c.configuredTarget("logs") {
g.Go(func() error {
return c.captureLogs(timestampDir)
})
}
if c.configuredTarget("metrics") {
// TODO: pass in context from caller
ctx, cancel := context.WithTimeout(context.Background(), c.duration)
defer cancel()

g.Go(func() error {
return c.captureMetrics(ctx, timestampDir)
})
}

return g.Wait()
}
Expand Down Expand Up @@ -515,20 +515,33 @@ func (c *cmd) captureLogs(timestampDir string) error {
}
}

func (c *cmd) captureMetrics(timestampDir string) error {

metrics, err := c.client.Agent().Metrics()
func (c *cmd) captureMetrics(ctx context.Context, timestampDir string) error {
stream, err := c.client.Agent().MetricsStream(ctx)
if err != nil {
return err
}
defer stream.Close()

marshaled, err := json.MarshalIndent(metrics, "", "\t")
filename := fmt.Sprintf("%s/%s.json", timestampDir, "metrics")
fh, err := os.Create(filename)
if err != nil {
return err
return fmt.Errorf("failed to create metrics file: %w", err)
}
defer fh.Close()

err = ioutil.WriteFile(fmt.Sprintf("%s/%s.json", timestampDir, "metrics"), marshaled, 0644)
return err
decoder := json.NewDecoder(stream)
encoder := json.NewEncoder(fh)
// TODO: is More() correct here?
for decoder.More() {
var raw interface{}
if err := decoder.Decode(&raw); err != nil {
return fmt.Errorf("failed to decode metrics: %w", err)
}
if err := encoder.Encode(raw); err != nil {
return fmt.Errorf("failed to write metrics to file: %w", err)
}
}
return nil
}

// allowedTarget returns a boolean if the target is able to be captured
Expand Down

0 comments on commit 460408a

Please sign in to comment.