Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transient stats prototype #3504

Merged
merged 1 commit into from
Dec 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions GLOCKFILE
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ cmd github.com/tebeka/go2xunit
cmd golang.org/x/tools/cmd/goimports
cmd golang.org/x/tools/cmd/stress
cmd golang.org/x/tools/cmd/stringer
github.com/VividCortex/ewma c34099b489e4ac33ca8d8c5f9d29d6eeaf69f2ed
github.com/agtorre/gocolorize f42b554bf7f006936130c9bb4f971afd2d87f671
github.com/biogo/store 3b4c041f52c224ee4a44f5c8b150d003a40643a0
github.com/cockroachdb/c-lz4 c40aaae2fc50293eb8750b34632bc3efe813e23f
github.com/cockroachdb/c-protobuf 6a18bfcdd5169966cd1235edb71d749c950216e8
github.com/cockroachdb/c-rocksdb bf15ead80bdc205a19b3d33415b23c156a3cf371
github.com/cockroachdb/c-snappy 5c6d0932e0adaffce4bfca7bdf2ac37f79952ccf
github.com/cockroachdb/yacc 443154b1852a8702b07d675da6cd97cd9177a316
github.com/codahale/hdrhistogram 954f16e8b9ef0e5d5189456aa4c1202758e04f17
github.com/coreos/etcd 0eb46eb1457bf277a5d6093287290b2b5cd6a964
github.com/cpuguy83/go-md2man 71acacd42f85e5e82f70a55327789582a5200a90
github.com/docker/docker 5e0283effa73223e5528c61beb4e05b5018c5d6b
Expand Down
22 changes: 13 additions & 9 deletions server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/metric"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/cockroachdb/cockroach/util/tracer"
"github.com/gogo/protobuf/proto"
Expand All @@ -61,6 +62,7 @@ const (
// IDs for bootstrapping the node itself or new stores as they're added
// on subsequent instantiations.
type Node struct {
stopper *stop.Stopper
ClusterID string // UUID for Cockroach cluster
Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology
ctx storage.StoreContext // Context to use and pass to stores
Expand Down Expand Up @@ -157,11 +159,12 @@ func BootstrapCluster(clusterID string, engines []engine.Engine, stopper *stop.S
}

// NewNode returns a new instance of Node.
func NewNode(ctx storage.StoreContext) *Node {
func NewNode(ctx storage.StoreContext, metaRegistry *metric.Registry, stopper *stop.Stopper) *Node {
return &Node{
ctx: ctx,
status: status.NewNodeStatusMonitor(),
stores: storage.NewStores(),
ctx: ctx,
stopper: stopper,
status: status.NewNodeStatusMonitor(metaRegistry),
stores: storage.NewStores(),
}
}

Expand Down Expand Up @@ -220,7 +223,7 @@ func (n *Node) initNodeID(id roachpb.NodeID) {
// RPC service "Node" and initializing stores for each specified
// engine. Launches periodic store gossiping in a goroutine.
func (n *Node) start(rpcServer *rpc.Server, addr net.Addr, engines []engine.Engine,
attrs roachpb.Attributes, stopper *stop.Stopper) error {
attrs roachpb.Attributes) error {
n.initDescriptor(addr, attrs)
const method = "Node.Batch"
if err := rpcServer.Register(method, n.executeCmd, &roachpb.BatchRequest{}); err != nil {
Expand All @@ -231,7 +234,7 @@ func (n *Node) start(rpcServer *rpc.Server, addr net.Addr, engines []engine.Engi
n.status.StartMonitorFeed(n.ctx.EventFeed)

// Initialize stores, including bootstrapping new ones.
if err := n.initStores(engines, stopper); err != nil {
if err := n.initStores(engines, n.stopper); err != nil {
return err
}

Expand All @@ -243,8 +246,8 @@ func (n *Node) start(rpcServer *rpc.Server, addr net.Addr, engines []engine.Engi
n.feed = status.NewNodeEventFeed(n.Descriptor.NodeID, n.ctx.EventFeed)
n.feed.StartNode(n.Descriptor, n.startedAt)

n.startPublishStatuses(stopper)
n.startGossip(stopper)
n.startPublishStatuses(n.stopper)
n.startGossip(n.stopper)
log.Infoc(n.context(), "Started node with %v engine(s) and attributes %v", engines, attrs.Attrs)
return nil
}
Expand Down Expand Up @@ -457,6 +460,7 @@ func (n *Node) executeCmd(argsI proto.Message) (proto.Message, error) {
defer trace.Epoch("node")()
ctx := tracer.ToCtx((*Node)(n).context(), trace)

tStart := time.Now()
br, pErr := n.stores.Send(ctx, *ba)
if pErr != nil {
br = &roachpb.BatchResponse{}
Expand All @@ -465,7 +469,7 @@ func (n *Node) executeCmd(argsI proto.Message) (proto.Message, error) {
if br.Error != nil {
panic(roachpb.ErrorUnexpectedlySet(n.stores, br))
}
n.feed.CallComplete(*ba, pErr)
n.feed.CallComplete(*ba, time.Now().Sub(tStart), pErr)
br.Error = pErr
return br, nil
}
7 changes: 4 additions & 3 deletions server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/cockroachdb/cockroach/util/metric"
"github.com/cockroachdb/cockroach/util/stop"
)

Expand Down Expand Up @@ -80,15 +81,15 @@ func createTestNode(addr net.Addr, engines []engine.Engine, gossipBS net.Addr, t
// (or attach LocalRPCTransport.Close to the stopper)
ctx.Transport = multiraft.NewLocalRPCTransport(stopper)
ctx.EventFeed = util.NewFeed(stopper)
node := NewNode(ctx)
node := NewNode(ctx, metric.NewRegistry(), stopper)
return rpcServer, ln.Addr(), ctx.Clock, node, stopper
}

// createAndStartTestNode creates a new test node and starts it. The server and node are returned.
func createAndStartTestNode(addr net.Addr, engines []engine.Engine, gossipBS net.Addr, t *testing.T) (
*rpc.Server, net.Addr, *Node, *stop.Stopper) {
rpcServer, addr, _, node, stopper := createTestNode(addr, engines, gossipBS, t)
if err := node.start(rpcServer, addr, engines, roachpb.Attributes{}, stopper); err != nil {
if err := node.start(rpcServer, addr, engines, roachpb.Attributes{}); err != nil {
t.Fatal(err)
}
return rpcServer, addr, node, stopper
Expand Down Expand Up @@ -273,7 +274,7 @@ func TestCorruptedClusterID(t *testing.T) {

engines := []engine.Engine{e}
server, serverAddr, _, node, stopper := createTestNode(util.CreateTestAddr("tcp"), engines, nil, t)
if err := node.start(server, serverAddr, engines, roachpb.Attributes{}, stopper); err == nil {
if err := node.start(server, serverAddr, engines, roachpb.Attributes{}); err == nil {
t.Errorf("unexpected success")
}
stopper.Stop()
Expand Down
20 changes: 12 additions & 8 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/metric"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/cockroachdb/cockroach/util/tracer"
assetfs "github.com/elazarl/go-bindata-assetfs"
Expand Down Expand Up @@ -81,6 +82,7 @@ type Server struct {
tsDB *ts.DB
tsServer *ts.Server
raftTransport multiraft.Transport
metaRegistry *metric.Registry
stopper *stop.Stopper
}

Expand All @@ -106,10 +108,11 @@ func NewServer(ctx *Context, stopper *stop.Stopper) (*Server, error) {
}

s := &Server{
ctx: ctx,
mux: http.NewServeMux(),
clock: hlc.NewClock(hlc.UnixNano),
stopper: stopper,
ctx: ctx,
mux: http.NewServeMux(),
clock: hlc.NewClock(hlc.UnixNano),
metaRegistry: metric.NewRegistry(),
stopper: stopper,
}
s.clock.SetMaxOffset(ctx.MaxOffset)

Expand Down Expand Up @@ -144,7 +147,7 @@ func NewServer(ctx *Context, stopper *stop.Stopper) (*Server, error) {

leaseMgr := sql.NewLeaseManager(0, *s.db, s.clock)
leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip)
s.sqlServer = sql.MakeServer(&s.ctx.Context, *s.db, s.gossip, leaseMgr, s.stopper)
s.sqlServer = sql.MakeServer(&s.ctx.Context, *s.db, s.gossip, leaseMgr, s.metaRegistry, s.stopper)
if err := s.sqlServer.RegisterRPC(s.rpc); err != nil {
return nil, err
}
Expand All @@ -171,9 +174,8 @@ func NewServer(ctx *Context, stopper *stop.Stopper) (*Server, error) {
Mode: s.ctx.BalanceMode,
},
}
s.node = NewNode(nCtx)
s.node = NewNode(nCtx, s.metaRegistry, s.stopper)
s.admin = newAdminServer(s.db, s.stopper)
s.status = newStatusServer(s.db, s.gossip, ctx)
s.tsDB = ts.NewDB(s.db)
s.tsServer = ts.NewServer(s.tsDB)

Expand Down Expand Up @@ -210,7 +212,7 @@ func (s *Server) Start(selfBootstrap bool) error {
}
s.gossip.Start(s.rpc, addr, s.stopper)

if err := s.node.start(s.rpc, addr, s.ctx.Engines, s.ctx.NodeAttributes, s.stopper); err != nil {
if err := s.node.start(s.rpc, addr, s.ctx.Engines, s.ctx.NodeAttributes); err != nil {
return err
}

Expand All @@ -227,6 +229,8 @@ func (s *Server) Start(selfBootstrap bool) error {

s.sqlServer.SetNodeID(s.node.Descriptor.NodeID)

s.status = newStatusServer(s.db, s.gossip, s.metaRegistry, s.ctx)

log.Infof("starting %s server at %s", s.ctx.HTTPRequestScheme(), addr)
s.initHTTP()

Expand Down
44 changes: 32 additions & 12 deletions server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/storage"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/metric"
"github.com/julienschmidt/httprouter"
)

Expand Down Expand Up @@ -90,6 +91,8 @@ const (
// statusStorePattern exposes status for a single store.
statusStorePattern = statusPrefix + "stores/:store_id"

statusMetricsPattern = statusPrefix + "metrics/:store_id"

// healthEndpoint is a shortcut for local details, intended for use by
// monitoring processes to verify that the server is up.
healthEndpoint = "/health"
Expand All @@ -100,15 +103,16 @@ var localRE = regexp.MustCompile(`(?i)local`)

// A statusServer provides a RESTful status API.
type statusServer struct {
db *client.DB
gossip *gossip.Gossip
router *httprouter.Router
ctx *Context
proxyClient *http.Client
db *client.DB
gossip *gossip.Gossip
metaRegistry *metric.Registry
router *httprouter.Router
ctx *Context
proxyClient *http.Client
}

// newStatusServer allocates and returns a statusServer.
func newStatusServer(db *client.DB, gossip *gossip.Gossip, ctx *Context) *statusServer {
func newStatusServer(db *client.DB, gossip *gossip.Gossip, metaRegistry *metric.Registry, ctx *Context) *statusServer {
// Create an http client with a timeout
tlsConfig, err := ctx.GetClientTLSConfig()
if err != nil {
Expand All @@ -121,11 +125,12 @@ func newStatusServer(db *client.DB, gossip *gossip.Gossip, ctx *Context) *status
}

server := &statusServer{
db: db,
gossip: gossip,
router: httprouter.New(),
ctx: ctx,
proxyClient: httpClient,
db: db,
gossip: gossip,
metaRegistry: metaRegistry,
router: httprouter.New(),
ctx: ctx,
proxyClient: httpClient,
}

server.router.GET(statusGossipPattern, server.handleGossip)
Expand All @@ -138,8 +143,9 @@ func newStatusServer(db *client.DB, gossip *gossip.Gossip, ctx *Context) *status
server.router.GET(statusNodePattern, server.handleNodeStatus)
server.router.GET(statusStoresPrefix, server.handleStoresStatus)
server.router.GET(statusStorePattern, server.handleStoreStatus)
server.router.GET(healthEndpoint, server.handleDetailsLocal)
server.router.GET(statusMetricsPattern, server.handleMetrics)

server.router.GET(healthEndpoint, server.handleDetailsLocal)
return server
}

Expand Down Expand Up @@ -592,6 +598,20 @@ func (s *statusServer) handleStoreStatus(w http.ResponseWriter, r *http.Request,
respondAsJSON(w, r, storeStatus)
}

func (s *statusServer) handleMetrics(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
nodeID, local, err := s.extractNodeID(ps)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if !local {
s.proxyRequest(nodeID, w, r)
return
}
respondAsJSON(w, r, s.metaRegistry)
}

func respondAsJSON(w http.ResponseWriter, r *http.Request, response interface{}) {
b, contentType, err := util.MarshalResponse(r, response, []util.EncodingType{util.JSONEncoding})
if err != nil {
Expand Down
23 changes: 14 additions & 9 deletions server/status/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package status

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util"
Expand All @@ -37,8 +38,9 @@ func (e StartNodeEvent) String() string {

// CallSuccessEvent is published when a call to a node completes without error.
type CallSuccessEvent struct {
NodeID roachpb.NodeID
Method roachpb.Method
NodeID roachpb.NodeID
Method roachpb.Method
Duration time.Duration
}

// String implements fmt.Stringer.
Expand All @@ -48,8 +50,9 @@ func (e CallSuccessEvent) String() string {

// CallErrorEvent is published when a call to a node returns an error.
type CallErrorEvent struct {
NodeID roachpb.NodeID
Method roachpb.Method
NodeID roachpb.NodeID
Method roachpb.Method
Duration time.Duration
}

// String implements fmt.Stringer.
Expand Down Expand Up @@ -86,7 +89,7 @@ func (nef NodeEventFeed) StartNode(desc roachpb.NodeDescriptor, startedAt int64)
// - For a successful request, a corresponding event for each request in the batch,
// - on error without index information, a failure of the Batch, and
// - on an indexed error a failure of the individual request.
func (nef NodeEventFeed) CallComplete(ba roachpb.BatchRequest, pErr *roachpb.Error) {
func (nef NodeEventFeed) CallComplete(ba roachpb.BatchRequest, d time.Duration, pErr *roachpb.Error) {
if pErr != nil && pErr.TransactionRestart == roachpb.TransactionRestart_ABORT {
method := roachpb.Batch
if iErr, ok := pErr.GoError().(roachpb.IndexedError); ok {
Expand All @@ -95,15 +98,17 @@ func (nef NodeEventFeed) CallComplete(ba roachpb.BatchRequest, pErr *roachpb.Err
}
}
nef.f.Publish(&CallErrorEvent{
NodeID: nef.id,
Method: method,
NodeID: nef.id,
Method: method,
Duration: d,
})
return
}
for _, union := range ba.Requests {
nef.f.Publish(&CallSuccessEvent{
NodeID: nef.id,
Method: union.GetInner().Method(),
NodeID: nef.id,
Method: union.GetInner().Method(),
Duration: d,
})
}
}
Expand Down
Loading