Skip to content

Commit

Permalink
transient stats prototype
Browse files Browse the repository at this point in the history
This is a prototype for the collection of transient performance statistics.
It does not yet expose "useful" runtime statistics, but is meant to lay the
groundwork for doing so across different parts of the system.

We were previously using the `go-metrics` package, which provides the concept
of a `Registry` bundling different metrics. Unfortunately, various limitations,
a certain amount of interface bloat (which made it annoying to provide custom
implementations) and various design choices made this unwieldy to work with
without upstream adaptations.
Instead, introduced the `util/metric` package which replaces the registry
functionality and provides light wrapper implementations around metrics.
Each registry object owns its metrics' goroutines and terminates them using
a closer channel; this allows for easy integration with `util.Stopper`.

Metric registries can be nested and in fact we do so, keeping one "global"
metrics registry and various others in it. In particular Node- and Store-level
metrics have individual registries. This is useful since this allows their
metrics to be stored to their corresponding time series with simple iterations
over a registry and with correct names globally and locally.

For now, we wrap `Counter`, `Gauge` and `EWMA` types and provide a windowed
histogram (based on Gil Tene's `HDRHistogram`s). Others (such as reservoir
backed histograms can be added as required.
  • Loading branch information
tbg committed Dec 23, 2015
1 parent 6ef0f08 commit 4fc2444
Show file tree
Hide file tree
Showing 18 changed files with 588 additions and 138 deletions.
2 changes: 2 additions & 0 deletions GLOCKFILE
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ cmd github.com/tebeka/go2xunit
cmd golang.org/x/tools/cmd/goimports
cmd golang.org/x/tools/cmd/stress
cmd golang.org/x/tools/cmd/stringer
github.com/VividCortex/ewma c34099b489e4ac33ca8d8c5f9d29d6eeaf69f2ed
github.com/agtorre/gocolorize f42b554bf7f006936130c9bb4f971afd2d87f671
github.com/biogo/store 3b4c041f52c224ee4a44f5c8b150d003a40643a0
github.com/cockroachdb/c-lz4 c40aaae2fc50293eb8750b34632bc3efe813e23f
github.com/cockroachdb/c-protobuf 6a18bfcdd5169966cd1235edb71d749c950216e8
github.com/cockroachdb/c-rocksdb bf15ead80bdc205a19b3d33415b23c156a3cf371
github.com/cockroachdb/c-snappy 5c6d0932e0adaffce4bfca7bdf2ac37f79952ccf
github.com/cockroachdb/yacc 443154b1852a8702b07d675da6cd97cd9177a316
github.com/codahale/hdrhistogram 954f16e8b9ef0e5d5189456aa4c1202758e04f17
github.com/coreos/etcd 0eb46eb1457bf277a5d6093287290b2b5cd6a964
github.com/cpuguy83/go-md2man 71acacd42f85e5e82f70a55327789582a5200a90
github.com/docker/docker 5e0283effa73223e5528c61beb4e05b5018c5d6b
Expand Down
22 changes: 13 additions & 9 deletions server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/metric"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/cockroachdb/cockroach/util/tracer"
"github.com/gogo/protobuf/proto"
Expand All @@ -61,6 +62,7 @@ const (
// IDs for bootstrapping the node itself or new stores as they're added
// on subsequent instantiations.
type Node struct {
stopper *stop.Stopper
ClusterID string // UUID for Cockroach cluster
Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology
ctx storage.StoreContext // Context to use and pass to stores
Expand Down Expand Up @@ -157,11 +159,12 @@ func BootstrapCluster(clusterID string, engines []engine.Engine, stopper *stop.S
}

// NewNode returns a new instance of Node.
func NewNode(ctx storage.StoreContext) *Node {
func NewNode(ctx storage.StoreContext, metaRegistry metric.Registry, stopper *stop.Stopper) *Node {
return &Node{
ctx: ctx,
status: status.NewNodeStatusMonitor(),
stores: storage.NewStores(),
ctx: ctx,
stopper: stopper,
status: status.NewNodeStatusMonitor(metaRegistry, stopper.ShouldStop()),
stores: storage.NewStores(),
}
}

Expand Down Expand Up @@ -220,7 +223,7 @@ func (n *Node) initNodeID(id roachpb.NodeID) {
// RPC service "Node" and initializing stores for each specified
// engine. Launches periodic store gossiping in a goroutine.
func (n *Node) start(rpcServer *rpc.Server, addr net.Addr, engines []engine.Engine,
attrs roachpb.Attributes, stopper *stop.Stopper) error {
attrs roachpb.Attributes) error {
n.initDescriptor(addr, attrs)
const method = "Node.Batch"
if err := rpcServer.Register(method, n.executeCmd, &roachpb.BatchRequest{}); err != nil {
Expand All @@ -231,7 +234,7 @@ func (n *Node) start(rpcServer *rpc.Server, addr net.Addr, engines []engine.Engi
n.status.StartMonitorFeed(n.ctx.EventFeed)

// Initialize stores, including bootstrapping new ones.
if err := n.initStores(engines, stopper); err != nil {
if err := n.initStores(engines, n.stopper); err != nil {
return err
}

Expand All @@ -243,8 +246,8 @@ func (n *Node) start(rpcServer *rpc.Server, addr net.Addr, engines []engine.Engi
n.feed = status.NewNodeEventFeed(n.Descriptor.NodeID, n.ctx.EventFeed)
n.feed.StartNode(n.Descriptor, n.startedAt)

n.startPublishStatuses(stopper)
n.startGossip(stopper)
n.startPublishStatuses(n.stopper)
n.startGossip(n.stopper)
log.Infoc(n.context(), "Started node with %v engine(s) and attributes %v", engines, attrs.Attrs)
return nil
}
Expand Down Expand Up @@ -457,6 +460,7 @@ func (n *Node) executeCmd(argsI proto.Message) (proto.Message, error) {
defer trace.Epoch("node")()
ctx := tracer.ToCtx((*Node)(n).context(), trace)

tStart := time.Now()
br, pErr := n.stores.Send(ctx, *ba)
if pErr != nil {
br = &roachpb.BatchResponse{}
Expand All @@ -465,7 +469,7 @@ func (n *Node) executeCmd(argsI proto.Message) (proto.Message, error) {
if br.Error != nil {
panic(roachpb.ErrorUnexpectedlySet(n.stores, br))
}
n.feed.CallComplete(*ba, pErr)
n.feed.CallComplete(*ba, time.Now().Sub(tStart), pErr)
br.Error = pErr
return br, nil
}
7 changes: 4 additions & 3 deletions server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/cockroachdb/cockroach/util/metric"
"github.com/cockroachdb/cockroach/util/stop"
)

Expand Down Expand Up @@ -80,15 +81,15 @@ func createTestNode(addr net.Addr, engines []engine.Engine, gossipBS net.Addr, t
// (or attach LocalRPCTransport.Close to the stopper)
ctx.Transport = multiraft.NewLocalRPCTransport(stopper)
ctx.EventFeed = util.NewFeed(stopper)
node := NewNode(ctx)
node := NewNode(ctx, metric.NewRegistry(stopper.ShouldStop()), stopper)
return rpcServer, ln.Addr(), ctx.Clock, node, stopper
}

// createAndStartTestNode creates a new test node and starts it. The server and node are returned.
func createAndStartTestNode(addr net.Addr, engines []engine.Engine, gossipBS net.Addr, t *testing.T) (
*rpc.Server, net.Addr, *Node, *stop.Stopper) {
rpcServer, addr, _, node, stopper := createTestNode(addr, engines, gossipBS, t)
if err := node.start(rpcServer, addr, engines, roachpb.Attributes{}, stopper); err != nil {
if err := node.start(rpcServer, addr, engines, roachpb.Attributes{}); err != nil {
t.Fatal(err)
}
return rpcServer, addr, node, stopper
Expand Down Expand Up @@ -273,7 +274,7 @@ func TestCorruptedClusterID(t *testing.T) {

engines := []engine.Engine{e}
server, serverAddr, _, node, stopper := createTestNode(util.CreateTestAddr("tcp"), engines, nil, t)
if err := node.start(server, serverAddr, engines, roachpb.Attributes{}, stopper); err == nil {
if err := node.start(server, serverAddr, engines, roachpb.Attributes{}); err == nil {
t.Errorf("unexpected success")
}
stopper.Stop()
Expand Down
20 changes: 12 additions & 8 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/metric"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/cockroachdb/cockroach/util/tracer"
assetfs "github.com/elazarl/go-bindata-assetfs"
Expand Down Expand Up @@ -81,6 +82,7 @@ type Server struct {
tsDB *ts.DB
tsServer *ts.Server
raftTransport multiraft.Transport
metaRegistry metric.Registry
stopper *stop.Stopper
}

Expand All @@ -106,10 +108,11 @@ func NewServer(ctx *Context, stopper *stop.Stopper) (*Server, error) {
}

s := &Server{
ctx: ctx,
mux: http.NewServeMux(),
clock: hlc.NewClock(hlc.UnixNano),
stopper: stopper,
ctx: ctx,
mux: http.NewServeMux(),
clock: hlc.NewClock(hlc.UnixNano),
metaRegistry: metric.NewRegistry(stopper.ShouldStop()),
stopper: stopper,
}
s.clock.SetMaxOffset(ctx.MaxOffset)

Expand Down Expand Up @@ -144,7 +147,7 @@ func NewServer(ctx *Context, stopper *stop.Stopper) (*Server, error) {

leaseMgr := sql.NewLeaseManager(0, *s.db, s.clock)
leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip)
s.sqlServer = sql.MakeServer(&s.ctx.Context, *s.db, s.gossip, leaseMgr, s.stopper)
s.sqlServer = sql.MakeServer(&s.ctx.Context, *s.db, s.gossip, leaseMgr, s.metaRegistry, s.stopper)
if err := s.sqlServer.RegisterRPC(s.rpc); err != nil {
return nil, err
}
Expand All @@ -171,9 +174,8 @@ func NewServer(ctx *Context, stopper *stop.Stopper) (*Server, error) {
Mode: s.ctx.BalanceMode,
},
}
s.node = NewNode(nCtx)
s.node = NewNode(nCtx, s.metaRegistry, s.stopper)
s.admin = newAdminServer(s.db, s.stopper)
s.status = newStatusServer(s.db, s.gossip, ctx)
s.tsDB = ts.NewDB(s.db)
s.tsServer = ts.NewServer(s.tsDB)

Expand Down Expand Up @@ -210,7 +212,7 @@ func (s *Server) Start(selfBootstrap bool) error {
}
s.gossip.Start(s.rpc, addr, s.stopper)

if err := s.node.start(s.rpc, addr, s.ctx.Engines, s.ctx.NodeAttributes, s.stopper); err != nil {
if err := s.node.start(s.rpc, addr, s.ctx.Engines, s.ctx.NodeAttributes); err != nil {
return err
}

Expand All @@ -227,6 +229,8 @@ func (s *Server) Start(selfBootstrap bool) error {

s.sqlServer.SetNodeID(s.node.Descriptor.NodeID)

s.status = newStatusServer(s.db, s.gossip, s.metaRegistry, s.ctx)

log.Infof("starting %s server at %s", s.ctx.HTTPRequestScheme(), addr)
s.initHTTP()

Expand Down
44 changes: 32 additions & 12 deletions server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/storage"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/metric"
"github.com/julienschmidt/httprouter"
)

Expand Down Expand Up @@ -90,6 +91,8 @@ const (
// statusStorePattern exposes status for a single store.
statusStorePattern = statusPrefix + "stores/:store_id"

statusTransientPattern = statusPrefix + "transient/:store_id"

// healthEndpoint is a shortcut for local details, intended for use by
// monitoring processes to verify that the server is up.
healthEndpoint = "/health"
Expand All @@ -100,15 +103,16 @@ var localRE = regexp.MustCompile(`(?i)local`)

// A statusServer provides a RESTful status API.
type statusServer struct {
db *client.DB
gossip *gossip.Gossip
router *httprouter.Router
ctx *Context
proxyClient *http.Client
db *client.DB
gossip *gossip.Gossip
metaRegistry metric.Registry
router *httprouter.Router
ctx *Context
proxyClient *http.Client
}

// newStatusServer allocates and returns a statusServer.
func newStatusServer(db *client.DB, gossip *gossip.Gossip, ctx *Context) *statusServer {
func newStatusServer(db *client.DB, gossip *gossip.Gossip, metaRegistry metric.Registry, ctx *Context) *statusServer {
// Create an http client with a timeout
tlsConfig, err := ctx.GetClientTLSConfig()
if err != nil {
Expand All @@ -121,11 +125,12 @@ func newStatusServer(db *client.DB, gossip *gossip.Gossip, ctx *Context) *status
}

server := &statusServer{
db: db,
gossip: gossip,
router: httprouter.New(),
ctx: ctx,
proxyClient: httpClient,
db: db,
gossip: gossip,
metaRegistry: metaRegistry,
router: httprouter.New(),
ctx: ctx,
proxyClient: httpClient,
}

server.router.GET(statusGossipPattern, server.handleGossip)
Expand All @@ -138,8 +143,9 @@ func newStatusServer(db *client.DB, gossip *gossip.Gossip, ctx *Context) *status
server.router.GET(statusNodePattern, server.handleNodeStatus)
server.router.GET(statusStoresPrefix, server.handleStoresStatus)
server.router.GET(statusStorePattern, server.handleStoreStatus)
server.router.GET(healthEndpoint, server.handleDetailsLocal)
server.router.GET(statusTransientPattern, server.handleTransient)

server.router.GET(healthEndpoint, server.handleDetailsLocal)
return server
}

Expand Down Expand Up @@ -592,6 +598,20 @@ func (s *statusServer) handleStoreStatus(w http.ResponseWriter, r *http.Request,
respondAsJSON(w, r, storeStatus)
}

func (s *statusServer) handleTransient(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
nodeID, local, err := s.extractNodeID(ps)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if !local {
s.proxyRequest(nodeID, w, r)
return
}
respondAsJSON(w, r, s.metaRegistry)
}

func respondAsJSON(w http.ResponseWriter, r *http.Request, response interface{}) {
b, contentType, err := util.MarshalResponse(r, response, []util.EncodingType{util.JSONEncoding})
if err != nil {
Expand Down
23 changes: 14 additions & 9 deletions server/status/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package status

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util"
Expand All @@ -37,8 +38,9 @@ func (e StartNodeEvent) String() string {

// CallSuccessEvent is published when a call to a node completes without error.
type CallSuccessEvent struct {
NodeID roachpb.NodeID
Method roachpb.Method
NodeID roachpb.NodeID
Method roachpb.Method
Duration time.Duration
}

// String implements fmt.Stringer.
Expand All @@ -48,8 +50,9 @@ func (e CallSuccessEvent) String() string {

// CallErrorEvent is published when a call to a node returns an error.
type CallErrorEvent struct {
NodeID roachpb.NodeID
Method roachpb.Method
NodeID roachpb.NodeID
Method roachpb.Method
Duration time.Duration
}

// String implements fmt.Stringer.
Expand Down Expand Up @@ -86,7 +89,7 @@ func (nef NodeEventFeed) StartNode(desc roachpb.NodeDescriptor, startedAt int64)
// - For a successful request, a corresponding event for each request in the batch,
// - on error without index information, a failure of the Batch, and
// - on an indexed error a failure of the individual request.
func (nef NodeEventFeed) CallComplete(ba roachpb.BatchRequest, pErr *roachpb.Error) {
func (nef NodeEventFeed) CallComplete(ba roachpb.BatchRequest, d time.Duration, pErr *roachpb.Error) {
if pErr != nil && pErr.TransactionRestart == roachpb.TransactionRestart_ABORT {
method := roachpb.Batch
if iErr, ok := pErr.GoError().(roachpb.IndexedError); ok {
Expand All @@ -95,15 +98,17 @@ func (nef NodeEventFeed) CallComplete(ba roachpb.BatchRequest, pErr *roachpb.Err
}
}
nef.f.Publish(&CallErrorEvent{
NodeID: nef.id,
Method: method,
NodeID: nef.id,
Method: method,
Duration: d,
})
return
}
for _, union := range ba.Requests {
nef.f.Publish(&CallSuccessEvent{
NodeID: nef.id,
Method: union.GetInner().Method(),
NodeID: nef.id,
Method: union.GetInner().Method(),
Duration: d,
})
}
}
Expand Down
Loading

0 comments on commit 4fc2444

Please sign in to comment.