diff --git a/cmd/demo/add_peer/main.go b/cmd/add_peer/main.go similarity index 100% rename from cmd/demo/add_peer/main.go rename to cmd/add_peer/main.go diff --git a/cmd/cut-release/main.go b/cmd/cut-release/main.go index 37d195af7..299643bd4 100644 --- a/cmd/cut-release/main.go +++ b/cmd/cut-release/main.go @@ -97,44 +97,34 @@ func updateHardCodedVersions(version string) { // Update `rpc/clients/typescript/package.json` tsClientPackageJSONPath := "rpc/clients/typescript/package.json" - updateVersionKey(tsClientPackageJSONPath, version) + newVersionString := fmt.Sprintf(`"version": "%s"`, version) + regex := `"version": "(.*)"` + updateFileWithRegex(tsClientPackageJSONPath, regex, newVersionString) // Update `core.go` corePath := "core/core.go" - updateVersionKey(corePath, version) + newVersionString = fmt.Sprintf(`version$1= "%s"`, version) + regex = `version(.*)= "(.*)"` + updateFileWithRegex(corePath, regex, newVersionString) // Update badge in README.md pathToMDFilesWithBadges := []string{"README.md", "docs/USAGE.md", "docs/DEVELOPMENT.md", "docs/DEPLOYMENT.md"} + doubleDashVersion := strings.Replace(version, "-", "--", -1) + newSvgName := fmt.Sprintf("version-%s-orange.svg", doubleDashVersion) + regex = `version-(.*)-orange.svg` for _, path := range pathToMDFilesWithBadges { - updateBadge(path, version) + updateFileWithRegex(path, regex, newSvgName) } } -func updateVersionKey(filePath string, version string) { +func updateFileWithRegex(filePath string, regex string, replacement string) { dat, err := ioutil.ReadFile(filePath) if err != nil { log.Fatal(err) } - newVersionString := fmt.Sprintf(`"version": "%s"`, version) - var re = regexp.MustCompile(`"version": "(.*)"`) - modifiedDat := []byte(re.ReplaceAllString(string(dat), newVersionString)) - err = ioutil.WriteFile(filePath, modifiedDat, 0644) - if err != nil { - log.Fatal(err) - } -} - -func updateBadge(filePath string, version string) { - dat, err := ioutil.ReadFile(filePath) - if err != nil { - log.Fatal(err) - } - doubleDashVersion := strings.Replace(version, "-", "--", -1) - newSvgName := fmt.Sprintf("version-%s-orange.svg", doubleDashVersion) - var re = regexp.MustCompile(`version-(.*)-orange.svg`) - modifiedDat := []byte(re.ReplaceAllString(string(dat), newSvgName)) - + var re = regexp.MustCompile(regex) + modifiedDat := []byte(re.ReplaceAllString(string(dat), replacement)) err = ioutil.WriteFile(filePath, modifiedDat, 0644) if err != nil { log.Fatal(err) diff --git a/cmd/mesh/rpc_handler.go b/cmd/mesh/rpc_handler.go index 80e008865..ee629538a 100644 --- a/cmd/mesh/rpc_handler.go +++ b/cmd/mesh/rpc_handler.go @@ -91,6 +91,17 @@ func (handler *rpcHandler) AddPeer(peerInfo peerstore.PeerInfo) error { return nil } +// GetStats is called when an RPC client calls GetStats, +func (handler *rpcHandler) GetStats() (*rpc.GetStatsResponse, error) { + log.Debug("received GetStats request via RPC") + getStatsResponse, err := handler.app.GetStats() + if err != nil { + log.WithField("error", err.Error()).Error("internal error in GetStats RPC call") + return nil, constants.ErrInternal + } + return getStatsResponse, nil +} + // SubscribeToOrders is called when an RPC client sends a `mesh_subscribe` request with the `orders` topic parameter func (handler *rpcHandler) SubscribeToOrders(ctx context.Context) (*ethRpc.Subscription, error) { log.Debug("received order event subscription request via RPC") diff --git a/core/core.go b/core/core.go index 7e1a63399..9c185d73f 100644 --- a/core/core.go +++ b/core/core.go @@ -1,6 +1,6 @@ // +build !js -// package core contains everything needed to configure and run a 0x Mesh node. +// Package core contains everything needed to configure and run a 0x Mesh node. package core import ( @@ -44,6 +44,7 @@ const ( peerConnectTimeout = 60 * time.Second checkNewAddrInterval = 20 * time.Second expirationPollingInterval = 50 * time.Millisecond + version = "development" ) // Config is a set of configuration options for 0x Mesh. @@ -210,7 +211,7 @@ func New(config Config) (*App, error) { log.WithFields(map[string]interface{}{ "config": config, - "version": "development", + "version": version, }).Info("finished initializing core.App") return app, nil @@ -609,6 +610,35 @@ func (app *App) AddPeer(peerInfo peerstore.PeerInfo) error { return app.node.Connect(peerInfo, peerConnectTimeout) } +// GetStats retrieves stats about the Mesh node +func (app *App) GetStats() (*rpc.GetStatsResponse, error) { + latestBlockHeader, err := app.blockWatcher.GetLatestBlock() + if err != nil { + return nil, err + } + latestBlock := rpc.LatestBlock{ + Number: int(latestBlockHeader.Number.Int64()), + Hash: latestBlockHeader.Hash, + } + notRemovedFilter := app.db.Orders.IsRemovedIndex.ValueFilter([]byte{0}) + numOrders, err := app.db.Orders.NewQuery(notRemovedFilter).Count() + if err != nil { + return nil, err + } + + response := &rpc.GetStatsResponse{ + Version: version, + PubSubTopic: getPubSubTopic(app.config.EthereumNetworkID), + Rendezvous: getRendezvous(app.config.EthereumNetworkID), + PeerID: app.peerID.String(), + EthereumNetworkID: app.config.EthereumNetworkID, + LatestBlock: latestBlock, + NumOrders: numOrders, + NumPeers: app.node.GetNumPeers(), + } + return response, nil +} + // SubscribeToOrderEvents let's one subscribe to order events emitted by the OrderWatcher func (app *App) SubscribeToOrderEvents(sink chan<- []*zeroex.OrderEvent) event.Subscription { subscription := app.orderWatcher.Subscribe(sink) diff --git a/ethereum/blockwatch/block_watcher.go b/ethereum/blockwatch/block_watcher.go index b8fdcc255..781cb4521 100644 --- a/ethereum/blockwatch/block_watcher.go +++ b/ethereum/blockwatch/block_watcher.go @@ -133,6 +133,11 @@ func (w *Watcher) Subscribe(sink chan<- []*Event) event.Subscription { return w.blockScope.Track(w.blockFeed.Subscribe(sink)) } +// GetLatestBlock returns the latest block processed +func (w *Watcher) GetLatestBlock() (*meshdb.MiniHeader, error) { + return w.stack.Peek() +} + // InspectRetainedBlocks returns the blocks retained in-memory by the Watcher instance. It is not // particularly performant and therefore should only be used for debugging and testing purposes. func (w *Watcher) InspectRetainedBlocks() ([]*meshdb.MiniHeader, error) { diff --git a/p2p/node.go b/p2p/node.go index 15e788f45..01bd7eb68 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -273,6 +273,11 @@ func (n *Node) UnsetPeerScore(id peer.ID, tag string) { n.connManager.UntagPeer(id, tag) } +// GetNumPeers returns the number of peers the node is connected to +func (n *Node) GetNumPeers() int { + return n.connManager.GetInfo().ConnCount +} + // Connect ensures there is a connection between this host and the peer with // given peerInfo. If there is not an active connection, Connect will dial the // peer, and block until a connection is open, timeout is exceeded, or an error diff --git a/rpc/client.go b/rpc/client.go index 7a5b5fb86..883a03831 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -4,6 +4,7 @@ import ( "context" "github.com/0xProject/0x-mesh/zeroex" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" peer "github.com/libp2p/go-libp2p-peer" peerstore "github.com/libp2p/go-libp2p-peerstore" @@ -66,6 +67,33 @@ func (c *Client) AddPeer(peerInfo peerstore.PeerInfo) error { return nil } +// LatestBlock is the latest block processed by the Mesh node +type LatestBlock struct { + Number int `json:"number"` + Hash common.Hash `json:"hash"` +} + +// GetStatsResponse is the response returned for an RPC request to mesh_getStats +type GetStatsResponse struct { + Version string `json:"version"` + PubSubTopic string `json:"pubSubTopic"` + Rendezvous string `json:"rendervous"` + PeerID string `json:"peerID"` + EthereumNetworkID int `json:"ethereumNetworkID"` + LatestBlock LatestBlock `json:"latestBlock"` + NumPeers int `json:"numPeers"` + NumOrders int `json:"numOrders"` +} + +// GetStats retrieves stats about the Mesh node +func (c *Client) GetStats() (*GetStatsResponse, error) { + var getStatsResponse *GetStatsResponse + if err := c.rpcClient.Call(&getStatsResponse, "mesh_getStats"); err != nil { + return nil, err + } + return getStatsResponse, nil +} + // SubscribeToOrders subscribes a stream of order events // Note copied from `go-ethereum` codebase: Slow subscribers will be dropped eventually. Client // buffers up to 8000 notifications before considering the subscriber dead. The subscription Err diff --git a/rpc/client_server_test.go b/rpc/client_server_test.go index d2c96aed0..3762d6911 100644 --- a/rpc/client_server_test.go +++ b/rpc/client_server_test.go @@ -29,6 +29,7 @@ type dummyRPCHandler struct { addOrdersHandler func(signedOrdersRaw []*json.RawMessage) (*zeroex.ValidationResults, error) getOrdersHandler func(page, perPage int, snapshotID string) (*GetOrdersResponse, error) addPeerHandler func(peerInfo peerstore.PeerInfo) error + getStatsHandler func() (*GetStatsResponse, error) subscribeToOrdersHandler func(ctx context.Context) (*rpc.Subscription, error) } @@ -53,6 +54,13 @@ func (d *dummyRPCHandler) AddPeer(peerInfo peerstore.PeerInfo) error { return d.addPeerHandler(peerInfo) } +func (d *dummyRPCHandler) GetStats() (*GetStatsResponse, error) { + if d.getStatsHandler == nil { + return nil, errors.New("dummyRPCHandler: no handler set for GetStats") + } + return d.getStatsHandler() +} + func (d *dummyRPCHandler) SubscribeToOrders(ctx context.Context) (*rpc.Subscription, error) { if d.subscribeToOrdersHandler == nil { return nil, errors.New("dummyRPCHandler: no handler set for Orders") @@ -249,6 +257,43 @@ func TestAddPeer(t *testing.T) { wg.Wait() } +func TestGetStats(t *testing.T) { + expectedGetStatsResponse := &GetStatsResponse{ + Version: "development", + PubSubTopic: "/0x-orders/network/development/version/1", + Rendezvous: "/0x-mesh/network/development/version/1", + PeerID: "16Uiu2HAmJ827EAibLvJxGMj6BvT1tr2e2ssW4cMtpP15qoQqZGSA", + EthereumNetworkID: 42, + LatestBlock: LatestBlock{ + Number: 1, + Hash: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"), + }, + NumOrders: 0, + NumPeers: 0, + } + + // Set up the dummy handler with a getStatsHandler + wg := &sync.WaitGroup{} + wg.Add(1) + rpcHandler := &dummyRPCHandler{ + getStatsHandler: func() (*GetStatsResponse, error) { + wg.Done() + return expectedGetStatsResponse, nil + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, client := newTestServerAndClient(t, rpcHandler, ctx) + + getStatsResponse, err := client.GetStats() + require.NoError(t, err) + require.Equal(t, expectedGetStatsResponse, getStatsResponse) + + // The WaitGroup signals that GetStats was called on the server-side. + wg.Wait() +} + func TestOrdersSubscription(t *testing.T) { ctx := context.Background() diff --git a/rpc/clients/typescript/src/index.ts b/rpc/clients/typescript/src/index.ts index 548243696..f64224fb4 100644 --- a/rpc/clients/typescript/src/index.ts +++ b/rpc/clients/typescript/src/index.ts @@ -13,6 +13,7 @@ export { RejectedOrderInfo, ValidationResults, GetOrdersResponse, + GetStatsResponse, } from './types'; export { SignedOrder } from '@0x/types'; export { BigNumber } from '@0x/utils'; diff --git a/rpc/clients/typescript/src/types.ts b/rpc/clients/typescript/src/types.ts index d488c3ad5..8097aa6ee 100644 --- a/rpc/clients/typescript/src/types.ts +++ b/rpc/clients/typescript/src/types.ts @@ -174,3 +174,19 @@ export interface WSMessage { type: string; utf8Data: string; } + +export interface LatestBlock { + number: number; + hash: string; +} + +export interface GetStatsResponse { + Version: string; + PubSubTopic: string; + Rendezvous: string; + PeerID: string; + EthereumNetworkID: number; + LatestBlock: LatestBlock; + NumPeers: number; + NumOrders: number; +} diff --git a/rpc/clients/typescript/src/ws_client.ts b/rpc/clients/typescript/src/ws_client.ts index 8cbf96324..259d2dd91 100644 --- a/rpc/clients/typescript/src/ws_client.ts +++ b/rpc/clients/typescript/src/ws_client.ts @@ -9,6 +9,7 @@ import * as WebSocket from 'websocket'; import { AcceptedOrderInfo, GetOrdersResponse, + GetStatsResponse, HeartbeatEventPayload, OrderEvent, OrderEventPayload, @@ -112,6 +113,10 @@ export class WSClient { }); return validationResults; } + public async GetStatsAsync(): Promise { + const stats = await this._wsProvider.send('mesh_getStats', []); + return stats; + } /** * Get all 0x signed orders currently stored in the Mesh node * @param perPage number of signedOrders to fetch per paginated request diff --git a/rpc/service.go b/rpc/service.go index 63f1312a4..984d95436 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -33,6 +33,8 @@ type RPCHandler interface { GetOrders(page, perPage int, snapshotID string) (*GetOrdersResponse, error) // AddPeer is called when the client sends an AddPeer request. AddPeer(peerInfo peerstore.PeerInfo) error + // GetStats is called when the client sends an GetStats request. + GetStats() (*GetStatsResponse, error) // SubscribeToOrders is called when a client sends a Subscribe to `orders` request SubscribeToOrders(ctx context.Context) (*rpc.Subscription, error) } @@ -150,3 +152,8 @@ func (s *rpcService) AddPeer(peerID string, multiaddrs []string) error { return s.rpcHandler.AddPeer(peerInfo) } + +// GetStats calls rpcHandler.GetStats. If there is an error, it returns it. +func (s *rpcService) GetStats() (*GetStatsResponse, error) { + return s.rpcHandler.GetStats() +}