From de175bddd7e3e2d50b3f55b486559af2068ba377 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 23 Dec 2015 14:39:08 -0500 Subject: [PATCH] [tbs] address feedback by @petermattis @bdarnell --- server/node.go | 2 +- server/server.go | 2 +- server/status.go | 10 ++-- server/status/feed_test.go | 4 +- server/status/monitor.go | 58 +++++++++--------- server/status/recorder.go | 2 +- server/status/recorder_test.go | 6 +- sql/executor.go | 4 +- sql/server.go | 2 +- util/metric/metric.go | 105 +++++++++++++++------------------ 10 files changed, 91 insertions(+), 104 deletions(-) diff --git a/server/node.go b/server/node.go index 7159eb120a0e..f7011e744339 100644 --- a/server/node.go +++ b/server/node.go @@ -159,7 +159,7 @@ func BootstrapCluster(clusterID string, engines []engine.Engine, stopper *stop.S } // NewNode returns a new instance of Node. -func NewNode(ctx storage.StoreContext, metaRegistry metric.Registry, stopper *stop.Stopper) *Node { +func NewNode(ctx storage.StoreContext, metaRegistry *metric.Registry, stopper *stop.Stopper) *Node { return &Node{ ctx: ctx, stopper: stopper, diff --git a/server/server.go b/server/server.go index c7894f157ee1..fcc1d7731733 100644 --- a/server/server.go +++ b/server/server.go @@ -82,7 +82,7 @@ type Server struct { tsDB *ts.DB tsServer *ts.Server raftTransport multiraft.Transport - metaRegistry metric.Registry + metaRegistry *metric.Registry stopper *stop.Stopper } diff --git a/server/status.go b/server/status.go index f925bddc1e3a..1f09bf559e61 100644 --- a/server/status.go +++ b/server/status.go @@ -91,7 +91,7 @@ const ( // statusStorePattern exposes status for a single store. statusStorePattern = statusPrefix + "stores/:store_id" - statusTransientPattern = statusPrefix + "transient/:store_id" + statusMetricsPattern = statusPrefix + "metrics/:store_id" // healthEndpoint is a shortcut for local details, intended for use by // monitoring processes to verify that the server is up. @@ -105,14 +105,14 @@ var localRE = regexp.MustCompile(`(?i)local`) type statusServer struct { db *client.DB gossip *gossip.Gossip - metaRegistry metric.Registry + metaRegistry *metric.Registry router *httprouter.Router ctx *Context proxyClient *http.Client } // newStatusServer allocates and returns a statusServer. -func newStatusServer(db *client.DB, gossip *gossip.Gossip, metaRegistry metric.Registry, ctx *Context) *statusServer { +func newStatusServer(db *client.DB, gossip *gossip.Gossip, metaRegistry *metric.Registry, ctx *Context) *statusServer { // Create an http client with a timeout tlsConfig, err := ctx.GetClientTLSConfig() if err != nil { @@ -143,7 +143,7 @@ func newStatusServer(db *client.DB, gossip *gossip.Gossip, metaRegistry metric.R server.router.GET(statusNodePattern, server.handleNodeStatus) server.router.GET(statusStoresPrefix, server.handleStoresStatus) server.router.GET(statusStorePattern, server.handleStoreStatus) - server.router.GET(statusTransientPattern, server.handleTransient) + server.router.GET(statusMetricsPattern, server.handleMetrics) server.router.GET(healthEndpoint, server.handleDetailsLocal) return server @@ -598,7 +598,7 @@ func (s *statusServer) handleStoreStatus(w http.ResponseWriter, r *http.Request, respondAsJSON(w, r, storeStatus) } -func (s *statusServer) handleTransient(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { +func (s *statusServer) handleMetrics(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { nodeID, local, err := s.extractNodeID(ps) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/server/status/feed_test.go b/server/status/feed_test.go index 2c77827351ab..ff740c1c7731 100644 --- a/server/status/feed_test.go +++ b/server/status/feed_test.go @@ -32,8 +32,6 @@ import ( "github.com/cockroachdb/cockroach/util/stop" ) -const d = time.Second - func wrap(args roachpb.Request) roachpb.BatchRequest { var ba roachpb.BatchRequest ba.Add(args) @@ -301,6 +299,8 @@ func TestNodeEventFeedTransactionRestart(t *testing.T) { ner := nodeEventReader{} ner.readEvents(feed) + d := 5 * time.Second + get := wrap(&roachpb.GetRequest{}) nodefeed.CallComplete(get, d, &roachpb.Error{ TransactionRestart: roachpb.TransactionRestart_BACKOFF}) diff --git a/server/status/monitor.go b/server/status/monitor.go index 8b6525c580a8..557596805310 100644 --- a/server/status/monitor.go +++ b/server/status/monitor.go @@ -36,30 +36,30 @@ import ( // for passing event feed data to these subset structures for accumulation. type NodeStatusMonitor struct { latency metric.Histograms - rateSuccess metric.EWMAS - rateError metric.EWMAS + rateSuccess metric.Rates + rateError metric.Rates numSuccess *metric.Counter numError *metric.Counter closer <-chan struct{} sync.RWMutex // Mutex to guard the following fields - registry metric.Registry - metaRegistry metric.Registry + registry *metric.Registry + metaRegistry *metric.Registry stores map[roachpb.StoreID]*StoreStatusMonitor desc roachpb.NodeDescriptor startedAt int64 } // NewNodeStatusMonitor initializes a new NodeStatusMonitor instance. -func NewNodeStatusMonitor(metaRegistry metric.Registry, closer <-chan struct{}) *NodeStatusMonitor { +func NewNodeStatusMonitor(metaRegistry *metric.Registry, closer <-chan struct{}) *NodeStatusMonitor { registry := metric.NewRegistry(closer) return &NodeStatusMonitor{ - latency: metric.RegisterLatency("latency%s", registry), - rateSuccess: metric.RegisterEWMAS("exec.rate.success%s", registry), - rateError: metric.RegisterEWMAS("exec.rate.error%s", registry), - numSuccess: metric.RegisterCounter("exec.num.success", registry), - numError: metric.RegisterCounter("exec.num.error", registry), + latency: registry.Latency("latency%s"), + rateSuccess: registry.Rates("exec.rate.success%s"), + rateError: registry.Rates("exec.rate.error%s"), + numSuccess: registry.Counter("exec.num.success"), + numError: registry.Counter("exec.num.error"), registry: registry, metaRegistry: metaRegistry, @@ -260,7 +260,7 @@ type StoreStatusMonitor struct { available *metric.Gauge sync.Mutex // Mutex to guard the following fields - registry metric.Registry + registry *metric.Registry stats engine.MVCCStats ID roachpb.StoreID desc *roachpb.StoreDescriptor @@ -268,7 +268,7 @@ type StoreStatusMonitor struct { } // NewStoreStatusMonitor constructs a StoreStatusMonitor with the given ID. -func NewStoreStatusMonitor(id roachpb.StoreID, metaRegistry metric.Registry, closer <-chan struct{}) *StoreStatusMonitor { +func NewStoreStatusMonitor(id roachpb.StoreID, metaRegistry *metric.Registry, closer <-chan struct{}) *StoreStatusMonitor { registry := metric.NewRegistry(closer) // Format as `cr.store..` in output, in analogy to the time // series data written. @@ -276,23 +276,23 @@ func NewStoreStatusMonitor(id roachpb.StoreID, metaRegistry metric.Registry, clo return &StoreStatusMonitor{ ID: id, registry: registry, - rangeCount: metric.RegisterCounter("ranges", registry), - leaderRangeCount: metric.RegisterGauge("ranges.leader", registry), - replicatedRangeCount: metric.RegisterGauge("ranges.replicated", registry), - availableRangeCount: metric.RegisterGauge("ranges.available", registry), - liveBytes: metric.RegisterGauge("livebytes", registry), - keyBytes: metric.RegisterGauge("keybytes", registry), - valBytes: metric.RegisterGauge("valbytes", registry), - intentBytes: metric.RegisterGauge("intentbytes", registry), - liveCount: metric.RegisterGauge("livecount", registry), - keyCount: metric.RegisterGauge("keycount", registry), - valCount: metric.RegisterGauge("valcount", registry), - intentCount: metric.RegisterGauge("intentcount", registry), - intentAge: metric.RegisterGauge("intentage", registry), - gcBytesAge: metric.RegisterGauge("gcbytesage", registry), - lastUpdateNanos: metric.RegisterGauge("lastupdatenanos", registry), - capacity: metric.RegisterGauge("capacity", registry), - available: metric.RegisterGauge("capacity.available", registry), + rangeCount: registry.Counter("ranges"), + leaderRangeCount: registry.Gauge("ranges.leader"), + replicatedRangeCount: registry.Gauge("ranges.replicated"), + availableRangeCount: registry.Gauge("ranges.available"), + liveBytes: registry.Gauge("livebytes"), + keyBytes: registry.Gauge("keybytes"), + valBytes: registry.Gauge("valbytes"), + intentBytes: registry.Gauge("intentbytes"), + liveCount: registry.Gauge("livecount"), + keyCount: registry.Gauge("keycount"), + valCount: registry.Gauge("valcount"), + intentCount: registry.Gauge("intentcount"), + intentAge: registry.Gauge("intentage"), + gcBytesAge: registry.Gauge("gcbytesage"), + lastUpdateNanos: registry.Gauge("lastupdatenanos"), + capacity: registry.Gauge("capacity"), + available: registry.Gauge("capacity.available"), } } diff --git a/server/status/recorder.go b/server/status/recorder.go index 57134c0a8c4d..c76d928aa1a6 100644 --- a/server/status/recorder.go +++ b/server/status/recorder.go @@ -164,7 +164,7 @@ func (nsr *NodeStatusRecorder) GetStatusSummaries() (*NodeStatus, []storage.Stor // registryRecorder is a helper class for recording time series datapoints // from a metrics Registry. type registryRecorder struct { - registry metric.Registry + registry *metric.Registry prefix string source string timestampNanos int64 diff --git a/server/status/recorder_test.go b/server/status/recorder_test.go index 377a9e7e55b3..cd91c0a5d79e 100644 --- a/server/status/recorder_test.go +++ b/server/status/recorder_test.go @@ -283,12 +283,10 @@ func TestNodeStatusRecorder(t *testing.T) { generateNodeData(1, "exec.num.error", 100, 1), generateNodeData(1, "exec.rate.success1h", 100, 0), generateNodeData(1, "exec.rate.error1h", 100, 0), - generateNodeData(1, "exec.rate.success30m", 100, 0), - generateNodeData(1, "exec.rate.error30m", 100, 0), + generateNodeData(1, "exec.rate.success10m", 100, 0), + generateNodeData(1, "exec.rate.error10m", 100, 0), generateNodeData(1, "exec.rate.success1m", 100, 0), generateNodeData(1, "exec.rate.error1m", 100, 0), - generateNodeData(1, "exec.rate.success5s", 100, 0), - generateNodeData(1, "exec.rate.error5s", 100, 0), } actual := recorder.GetTimeSeriesData() diff --git a/sql/executor.go b/sql/executor.go index 3e45a1281ee9..a16b53d09805 100644 --- a/sql/executor.go +++ b/sql/executor.go @@ -80,13 +80,13 @@ type Executor struct { // newExecutor creates an Executor and registers a callback on the // system config. -func newExecutor(db client.DB, gossip *gossip.Gossip, leaseMgr *LeaseManager, metaRegistry metric.Registry, stopper *stop.Stopper) *Executor { +func newExecutor(db client.DB, gossip *gossip.Gossip, leaseMgr *LeaseManager, metaRegistry *metric.Registry, stopper *stop.Stopper) *Executor { exec := &Executor{ db: db, reCache: parser.NewRegexpCache(512), leaseMgr: leaseMgr, - latency: metric.RegisterLatency("sql.latency%s", metaRegistry), + latency: metaRegistry.Latency("sql.latency%s"), } exec.systemConfigCond = sync.NewCond(&exec.systemConfigMu) diff --git a/sql/server.go b/sql/server.go index e1a3d01e691e..18dfc5720cce 100644 --- a/sql/server.go +++ b/sql/server.go @@ -44,7 +44,7 @@ type Server struct { } // MakeServer creates a Server. -func MakeServer(ctx *base.Context, db client.DB, gossip *gossip.Gossip, leaseMgr *LeaseManager, metaRegistry metric.Registry, stopper *stop.Stopper) Server { +func MakeServer(ctx *base.Context, db client.DB, gossip *gossip.Gossip, leaseMgr *LeaseManager, metaRegistry *metric.Registry, stopper *stop.Stopper) Server { return Server{ context: ctx, Executor: newExecutor(db, gossip, leaseMgr, metaRegistry, stopper), diff --git a/util/metric/metric.go b/util/metric/metric.go index a195cb175f6b..790debe97f3b 100644 --- a/util/metric/metric.go +++ b/util/metric/metric.go @@ -16,12 +16,9 @@ type timeScale struct { d time.Duration } -var scale5S = timeScale{"5s", 5 * time.Second} var scale1M = timeScale{"1m", 1 * time.Minute} -var scale5M = timeScale{"5m", 5 * time.Minute} -var scale30M = timeScale{"30m", 30 * time.Minute} +var scale10M = timeScale{"10m", 10 * time.Minute} var scale1H = timeScale{"1h", time.Hour} -var scale1D = timeScale{"1d", 24 * time.Hour} // Iterable provides a method for synchronized access to interior objects. type Iterable interface { @@ -33,13 +30,7 @@ type Iterable interface { var _ Iterable = &Gauge{} var _ Iterable = &Counter{} var _ Iterable = &Histogram{} -var _ Iterable = &EWMA{} - -// Registry contains methods to collect metrics. -type Registry interface { - Iterable - Add(string, Iterable) -} +var _ Iterable = &Rate{} type periodic interface { tickInterval() time.Duration @@ -47,13 +38,11 @@ type periodic interface { } var _ periodic = &Histogram{} -var _ periodic = &EWMA{} - -var _ Registry = ®istry{} +var _ periodic = &Rate{} -// A registry bundles up various registries to provide a single point of -// access to them. -type registry struct { +// A Registry bundles up various iterables (i.e. typically metrics or other +// registries) to provide a single point of access to them. +type Registry struct { sync.Mutex tracked map[string]Iterable closer <-chan struct{} @@ -62,8 +51,8 @@ type registry struct { // NewRegistry creates a new Registry. Some types of metrics require an associated // goroutine to ping them at fixed intervals; these goroutines will select on the // supplied closer. -func NewRegistry(closer <-chan struct{}) Registry { - return ®istry{ +func NewRegistry(closer <-chan struct{}) *Registry { + return &Registry{ tracked: map[string]Iterable{}, closer: closer, } @@ -86,11 +75,11 @@ func periodicAction(pi periodic, closer <-chan struct{}) { } } -// Add links the given registry into this registry using the given format +// Add links the given Iterable into this registry using the given format // string. The individual items in the registry will be formatted via // fmt.Sprintf(format, ). As a special case, metrics itself also implement // Iterable and can thus be added to a registry. -func (r *registry) Add(format string, item Iterable) { +func (r *Registry) Add(format string, item Iterable) { r.Lock() r.tracked[format] = item if pi, ok := item.(periodic); ok { @@ -102,7 +91,7 @@ func (r *registry) Add(format string, item Iterable) { } // Each calls the given closure for all metrics. -func (r *registry) Each(f func(name string, val interface{})) { +func (r *Registry) Each(f func(name string, val interface{})) { r.Lock() defer r.Unlock() for format, registry := range r.tracked { @@ -117,7 +106,7 @@ func (r *registry) Each(f func(name string, val interface{})) { } // MarshalJSON marshals to JSON. -func (r *registry) MarshalJSON() ([]byte, error) { +func (r *Registry) MarshalJSON() ([]byte, error) { m := make(map[string]interface{}) r.Each(func(name string, v interface{}) { m[name] = v @@ -157,10 +146,10 @@ func (h *Histogram) tickFn() { h.mu.Unlock() } -// RegisterHistogram registers a new windowed HDRHistogram with the given +// Histogram registers a new windowed HDRHistogram with the given // parameters. Data is kept in the active window for approximately the given // duration. -func RegisterHistogram(name string, duration time.Duration, unit Unit, maxVal MaxVal, sigFigs int, r Registry) *Histogram { +func (r *Registry) Histogram(name string, duration time.Duration, unit Unit, maxVal MaxVal, sigFigs int) *Histogram { const n = 4 h := &Histogram{} h.maxVal = int64(maxVal) @@ -172,15 +161,15 @@ func RegisterHistogram(name string, duration time.Duration, unit Unit, maxVal Ma return h } -// RegisterLatency is a convenience function which registers histograms with +// Latency is a convenience function which registers histograms with // suitable defaults for latency tracking on millisecond to minute time scales. // The generated names of the metric can be controlled via the given format // string. -func RegisterLatency(format string, r Registry) Histograms { - windows := []timeScale{scale5M, scale1H, scale1D} +func (r *Registry) Latency(format string) Histograms { + windows := []timeScale{scale1M, scale10M, scale1H} hs := make([]*Histogram, 0, 3) for _, w := range windows { - h := RegisterHistogram(fmt.Sprintf(format, w.name), w.d, UnitMs, MaxMinute, 2, r) + h := r.Histogram(fmt.Sprintf(format, w.name), w.d, UnitMs, MaxMinute, 2) hs = append(hs, h) } return hs @@ -233,8 +222,8 @@ func (c *Counter) MarshalJSON() ([]byte, error) { return json.Marshal(c.Counter.Count()) } -// RegisterCounter registers a new counter under the given name. -func RegisterCounter(name string, r Registry) *Counter { +// Counter registers a new counter under the given name. +func (r *Registry) Counter(name string) *Counter { c := &Counter{metrics.NewCounter()} r.Add(name, c) return c @@ -253,32 +242,32 @@ func (g *Gauge) MarshalJSON() ([]byte, error) { return json.Marshal(g.Gauge.Value()) } -// RegisterGauge registers a new Gauge with the given name. -func RegisterGauge(name string, r Registry) *Gauge { +// Gauge registers a new Gauge with the given name. +func (r *Registry) Gauge(name string) *Gauge { g := &Gauge{metrics.NewGauge()} r.Add(name, g) return g } -// An EWMA is a exponential weighted moving average. -type EWMA struct { - curSum float64 +// A Rate is a exponential weighted moving average. +type Rate struct { interval time.Duration - mu sync.Mutex + mu sync.Mutex // protects fields below + curSum float64 wrapped ewma.MovingAverage } -// RegisterEWMA registers an EWMA over the given timescale. Timescales at or -// below 2s are illegal and will cause a panic. -func RegisterEWMA(name string, timescale time.Duration, r Registry) *EWMA { +// Rate registers an EWMA rate over the given timescale. Timescales at +// or below 2s are illegal and will cause a panic. +func (r *Registry) Rate(name string, timescale time.Duration) *Rate { const tickInterval = time.Second if timescale <= 2*time.Second { panic(fmt.Sprintf("EWMA with per-second ticks makes no sense on timescale %s", timescale)) } avgAge := float64(timescale) / float64(2*tickInterval) - e := &EWMA{ + e := &Rate{ interval: tickInterval, wrapped: ewma.NewMovingAverage(avgAge), } @@ -286,55 +275,55 @@ func RegisterEWMA(name string, timescale time.Duration, r Registry) *EWMA { return e } -func (e *EWMA) tickInterval() time.Duration { +func (e *Rate) tickInterval() time.Duration { return e.interval } -func (e *EWMA) tickFn() { +func (e *Rate) tickFn() { e.mu.Lock() e.wrapped.Add(e.curSum) e.curSum = 0 e.mu.Unlock() } -// Add adds the given measurement to the EWMA. -func (e *EWMA) Add(v float64) { +// Add adds the given measurement to the Rate. +func (e *Rate) Add(v float64) { e.mu.Lock() e.curSum += v e.mu.Unlock() } -// Each calls the given closure with the empty string and the EWMA. -func (e *EWMA) Each(f func(string, interface{})) { +// Each calls the given closure with the empty string and the Rate. +func (e *Rate) Each(f func(string, interface{})) { e.mu.Lock() defer e.mu.Unlock() f("", e.wrapped.Value()) } // MarshalJSON marshals to JSON. -func (e *EWMA) MarshalJSON() ([]byte, error) { +func (e *Rate) MarshalJSON() ([]byte, error) { e.mu.Lock() defer e.mu.Unlock() return json.Marshal(e.wrapped.Value()) } -// EWMAS is a slice of EWMA metrics. -type EWMAS []*EWMA +// Rates is a slice of EWMA backed rates. +type Rates []*Rate -// RegisterEWMAS returns a slice of EWMAs with the given format string and +// Rates returns a slice of EWMAs with the given format string and // various "standard" timescales. -func RegisterEWMAS(format string, r Registry) EWMAS { - scales := []timeScale{scale5S, scale1M, scale30M, scale1H} - es := make([]*EWMA, 0, len(scales)) +func (r *Registry) Rates(format string) Rates { + scales := []timeScale{scale1M, scale10M, scale1H} + es := make([]*Rate, 0, len(scales)) for _, scale := range scales { - es = append(es, RegisterEWMA(fmt.Sprintf(format, scale.name), - scale.d, r)) + es = append(es, r.Rate(fmt.Sprintf(format, scale.name), + scale.d)) } return es } -// Add adds the given value to all underlying EWMAs. -func (es EWMAS) Add(v float64) { +// Add adds the given value to all underlying Rates. +func (es Rates) Add(v float64) { for _, e := range es { e.Add(v) }