From 460408a05e76d8f4a0b74ae4784f2ff1693604d3 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 14 Jun 2021 18:37:05 -0400 Subject: [PATCH] debug: use the new metrics stream in debug command --- agent/agent_endpoint.go | 49 +++++++++++++++++++++++++++++++++++++++ agent/http.go | 2 +- agent/http_register.go | 1 + agent/setup.go | 3 +++ api/agent.go | 13 +++++++++++ command/debug/debug.go | 51 ++++++++++++++++++++++++++--------------- 6 files changed, 99 insertions(+), 20 deletions(-) diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index add31531bb65..90f09f9b0cf7 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -1,6 +1,7 @@ package agent import ( + "encoding/json" "fmt" "net/http" "strconv" @@ -158,6 +159,54 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request) return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req) } +func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Fetch the ACL token, if any, and enforce agent policy. + var token string + s.parseToken(req, &token) + rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) + if err != nil { + return nil, err + } + if rule != nil && rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow { + return nil, acl.ErrPermissionDenied + } + + flusher, ok := resp.(http.Flusher) + if !ok { + return nil, fmt.Errorf("Streaming not supported") + } + + resp.WriteHeader(http.StatusOK) + + // 0 byte write is needed before the Flush call so that if we are using + // a gzip stream it will go ahead and write out the HTTP response header + resp.Write([]byte("")) + flusher.Flush() + + enc := metricsEncoder{ + logger: s.agent.logger, + encoder: json.NewEncoder(resp), + flusher: flusher, + } + s.agent.baseDeps.MetricsHandler.Stream(req.Context(), enc) + return nil, nil +} + +type metricsEncoder struct { + logger hclog.Logger + encoder *json.Encoder + flusher http.Flusher +} + +func (m metricsEncoder) Encode(summary interface{}) error { + if err := m.encoder.Encode(summary); err != nil { + m.logger.Error("failed to encode metrics summary", "error", err) + return err + } + m.flusher.Flush() + return nil +} + func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string diff --git a/agent/http.go b/agent/http.go index f6a8b448e422..229e499134a6 100644 --- a/agent/http.go +++ b/agent/http.go @@ -219,7 +219,7 @@ func (s *HTTPHandlers) handler(enableDebug bool) http.Handler { var gzipHandler http.Handler minSize := gziphandler.DefaultMinSize - if pattern == "/v1/agent/monitor" { + if pattern == "/v1/agent/monitor" || pattern == "/v1/agent/metrics/stream" { minSize = 0 } gzipWrapper, err := gziphandler.GzipHandlerWithOpts(gziphandler.MinSize(minSize)) diff --git a/agent/http_register.go b/agent/http_register.go index 41020e858a98..391076277434 100644 --- a/agent/http_register.go +++ b/agent/http_register.go @@ -38,6 +38,7 @@ func init() { registerEndpoint("/v1/agent/reload", []string{"PUT"}, (*HTTPHandlers).AgentReload) registerEndpoint("/v1/agent/monitor", []string{"GET"}, (*HTTPHandlers).AgentMonitor) registerEndpoint("/v1/agent/metrics", []string{"GET"}, (*HTTPHandlers).AgentMetrics) + registerEndpoint("/v1/agent/metrics/stream", []string{"GET"}, (*HTTPHandlers).AgentMetricsStream) registerEndpoint("/v1/agent/services", []string{"GET"}, (*HTTPHandlers).AgentServices) registerEndpoint("/v1/agent/service/", []string{"GET"}, (*HTTPHandlers).AgentService) registerEndpoint("/v1/agent/checks", []string{"GET"}, (*HTTPHandlers).AgentChecks) diff --git a/agent/setup.go b/agent/setup.go index bfa4abfadece..99999e91fd3d 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -1,6 +1,7 @@ package agent import ( + "context" "fmt" "io" "net" @@ -8,6 +9,7 @@ import ( "sync" "time" + "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" @@ -48,6 +50,7 @@ type BaseDeps struct { // MetricsHandler provides an http.Handler for displaying metrics. type MetricsHandler interface { DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) + Stream(ctx context.Context, encoder metrics.Encoder) } type ConfigLoader func(source config.Source) (config.LoadResult, error) diff --git a/api/agent.go b/api/agent.go index be644e18c611..b83f67f543e7 100644 --- a/api/agent.go +++ b/api/agent.go @@ -482,6 +482,19 @@ func (a *Agent) Metrics() (*MetricsInfo, error) { return out, nil } +// MetricsStream returns an io.ReadCloser which will emit a stream of metrics +// until the context is cancelled. The metrics are json encoded. +// The caller is responsible for closing the returned io.ReadCloser. +func (a *Agent) MetricsStream(ctx context.Context) (io.ReadCloser, error) { + r := a.c.newRequest("GET", "/v1/agent/metrics/stream") + r.ctx = ctx + _, resp, err := requireOK(a.c.doRequest(r)) + if err != nil { + return nil, err + } + return resp.Body, nil +} + // Reload triggers a configuration reload for the agent we are connected to. func (a *Agent) Reload() error { r := a.c.newRequest("PUT", "/v1/agent/reload") diff --git a/command/debug/debug.go b/command/debug/debug.go index 37601bd0de5e..45d3642441ea 100644 --- a/command/debug/debug.go +++ b/command/debug/debug.go @@ -3,11 +3,11 @@ package debug import ( "archive/tar" "compress/gzip" + "context" "encoding/json" "errors" "flag" "fmt" - "golang.org/x/sync/errgroup" "io" "io/ioutil" "os" @@ -15,6 +15,8 @@ import ( "strings" "time" + "golang.org/x/sync/errgroup" + "github.com/hashicorp/go-multierror" "github.com/mitchellh/cli" @@ -386,14 +388,6 @@ func captureShortLived(c *cmd) error { return c.captureGoRoutines(timestampDir) }) } - - // Capture metrics - if c.configuredTarget("metrics") { - g.Go(func() error { - return c.captureMetrics(timestampDir) - }) - } - return g.Wait() } @@ -412,7 +406,6 @@ func (c *cmd) captureLongRunning() error { timestamp := time.Now().Local().Unix() timestampDir, err := c.createTimestampDir(timestamp) - if err != nil { return err } @@ -423,7 +416,6 @@ func (c *cmd) captureLongRunning() error { if s < 1 { s = 1 } - // Capture pprof if c.configuredTarget("pprof") { g.Go(func() error { return c.captureProfile(s, timestampDir) @@ -433,12 +425,20 @@ func (c *cmd) captureLongRunning() error { return c.captureTrace(s, timestampDir) }) } - // Capture logs if c.configuredTarget("logs") { g.Go(func() error { return c.captureLogs(timestampDir) }) } + if c.configuredTarget("metrics") { + // TODO: pass in context from caller + ctx, cancel := context.WithTimeout(context.Background(), c.duration) + defer cancel() + + g.Go(func() error { + return c.captureMetrics(ctx, timestampDir) + }) + } return g.Wait() } @@ -515,20 +515,33 @@ func (c *cmd) captureLogs(timestampDir string) error { } } -func (c *cmd) captureMetrics(timestampDir string) error { - - metrics, err := c.client.Agent().Metrics() +func (c *cmd) captureMetrics(ctx context.Context, timestampDir string) error { + stream, err := c.client.Agent().MetricsStream(ctx) if err != nil { return err } + defer stream.Close() - marshaled, err := json.MarshalIndent(metrics, "", "\t") + filename := fmt.Sprintf("%s/%s.json", timestampDir, "metrics") + fh, err := os.Create(filename) if err != nil { - return err + return fmt.Errorf("failed to create metrics file: %w", err) } + defer fh.Close() - err = ioutil.WriteFile(fmt.Sprintf("%s/%s.json", timestampDir, "metrics"), marshaled, 0644) - return err + decoder := json.NewDecoder(stream) + encoder := json.NewEncoder(fh) + // TODO: is More() correct here? + for decoder.More() { + var raw interface{} + if err := decoder.Decode(&raw); err != nil { + return fmt.Errorf("failed to decode metrics: %w", err) + } + if err := encoder.Encode(raw); err != nil { + return fmt.Errorf("failed to write metrics to file: %w", err) + } + } + return nil } // allowedTarget returns a boolean if the target is able to be captured