Skip to content

Commit

Permalink
Merge pull request #322 from 0xProject/feature/statsEndpoint
Browse files Browse the repository at this point in the history
rpc: Add `mesh_getStats` JSON-RPC method
  • Loading branch information
fabioberger committed Jul 30, 2019
2 parents 3d92b26 + 1c60e1a commit 036ffa6
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 25 deletions.
File renamed without changes.
36 changes: 13 additions & 23 deletions cmd/cut-release/main.go
Expand Up @@ -97,44 +97,34 @@ func updateHardCodedVersions(version string) {

// Update `rpc/clients/typescript/package.json`
tsClientPackageJSONPath := "rpc/clients/typescript/package.json"
updateVersionKey(tsClientPackageJSONPath, version)
newVersionString := fmt.Sprintf(`"version": "%s"`, version)
regex := `"version": "(.*)"`
updateFileWithRegex(tsClientPackageJSONPath, regex, newVersionString)

// Update `core.go`
corePath := "core/core.go"
updateVersionKey(corePath, version)
newVersionString = fmt.Sprintf(`version$1= "%s"`, version)
regex = `version(.*)= "(.*)"`
updateFileWithRegex(corePath, regex, newVersionString)

// Update badge in README.md
pathToMDFilesWithBadges := []string{"README.md", "docs/USAGE.md", "docs/DEVELOPMENT.md", "docs/DEPLOYMENT.md"}
doubleDashVersion := strings.Replace(version, "-", "--", -1)
newSvgName := fmt.Sprintf("version-%s-orange.svg", doubleDashVersion)
regex = `version-(.*)-orange.svg`
for _, path := range pathToMDFilesWithBadges {
updateBadge(path, version)
updateFileWithRegex(path, regex, newSvgName)
}
}

func updateVersionKey(filePath string, version string) {
func updateFileWithRegex(filePath string, regex string, replacement string) {
dat, err := ioutil.ReadFile(filePath)
if err != nil {
log.Fatal(err)
}

newVersionString := fmt.Sprintf(`"version": "%s"`, version)
var re = regexp.MustCompile(`"version": "(.*)"`)
modifiedDat := []byte(re.ReplaceAllString(string(dat), newVersionString))
err = ioutil.WriteFile(filePath, modifiedDat, 0644)
if err != nil {
log.Fatal(err)
}
}

func updateBadge(filePath string, version string) {
dat, err := ioutil.ReadFile(filePath)
if err != nil {
log.Fatal(err)
}
doubleDashVersion := strings.Replace(version, "-", "--", -1)
newSvgName := fmt.Sprintf("version-%s-orange.svg", doubleDashVersion)
var re = regexp.MustCompile(`version-(.*)-orange.svg`)
modifiedDat := []byte(re.ReplaceAllString(string(dat), newSvgName))

var re = regexp.MustCompile(regex)
modifiedDat := []byte(re.ReplaceAllString(string(dat), replacement))
err = ioutil.WriteFile(filePath, modifiedDat, 0644)
if err != nil {
log.Fatal(err)
Expand Down
11 changes: 11 additions & 0 deletions cmd/mesh/rpc_handler.go
Expand Up @@ -91,6 +91,17 @@ func (handler *rpcHandler) AddPeer(peerInfo peerstore.PeerInfo) error {
return nil
}

// GetStats is called when an RPC client calls GetStats,
func (handler *rpcHandler) GetStats() (*rpc.GetStatsResponse, error) {
log.Debug("received GetStats request via RPC")
getStatsResponse, err := handler.app.GetStats()
if err != nil {
log.WithField("error", err.Error()).Error("internal error in GetStats RPC call")
return nil, constants.ErrInternal
}
return getStatsResponse, nil
}

// SubscribeToOrders is called when an RPC client sends a `mesh_subscribe` request with the `orders` topic parameter
func (handler *rpcHandler) SubscribeToOrders(ctx context.Context) (*ethRpc.Subscription, error) {
log.Debug("received order event subscription request via RPC")
Expand Down
34 changes: 32 additions & 2 deletions core/core.go
@@ -1,6 +1,6 @@
// +build !js

// package core contains everything needed to configure and run a 0x Mesh node.
// Package core contains everything needed to configure and run a 0x Mesh node.
package core

import (
Expand Down Expand Up @@ -44,6 +44,7 @@ const (
peerConnectTimeout = 60 * time.Second
checkNewAddrInterval = 20 * time.Second
expirationPollingInterval = 50 * time.Millisecond
version = "development"
)

// Config is a set of configuration options for 0x Mesh.
Expand Down Expand Up @@ -210,7 +211,7 @@ func New(config Config) (*App, error) {

log.WithFields(map[string]interface{}{
"config": config,
"version": "development",
"version": version,
}).Info("finished initializing core.App")

return app, nil
Expand Down Expand Up @@ -609,6 +610,35 @@ func (app *App) AddPeer(peerInfo peerstore.PeerInfo) error {
return app.node.Connect(peerInfo, peerConnectTimeout)
}

// GetStats retrieves stats about the Mesh node
func (app *App) GetStats() (*rpc.GetStatsResponse, error) {
latestBlockHeader, err := app.blockWatcher.GetLatestBlock()
if err != nil {
return nil, err
}
latestBlock := rpc.LatestBlock{
Number: int(latestBlockHeader.Number.Int64()),
Hash: latestBlockHeader.Hash,
}
notRemovedFilter := app.db.Orders.IsRemovedIndex.ValueFilter([]byte{0})
numOrders, err := app.db.Orders.NewQuery(notRemovedFilter).Count()
if err != nil {
return nil, err
}

response := &rpc.GetStatsResponse{
Version: version,
PubSubTopic: getPubSubTopic(app.config.EthereumNetworkID),
Rendezvous: getRendezvous(app.config.EthereumNetworkID),
PeerID: app.peerID.String(),
EthereumNetworkID: app.config.EthereumNetworkID,
LatestBlock: latestBlock,
NumOrders: numOrders,
NumPeers: app.node.GetNumPeers(),
}
return response, nil
}

// SubscribeToOrderEvents let's one subscribe to order events emitted by the OrderWatcher
func (app *App) SubscribeToOrderEvents(sink chan<- []*zeroex.OrderEvent) event.Subscription {
subscription := app.orderWatcher.Subscribe(sink)
Expand Down
5 changes: 5 additions & 0 deletions ethereum/blockwatch/block_watcher.go
Expand Up @@ -133,6 +133,11 @@ func (w *Watcher) Subscribe(sink chan<- []*Event) event.Subscription {
return w.blockScope.Track(w.blockFeed.Subscribe(sink))
}

// GetLatestBlock returns the latest block processed
func (w *Watcher) GetLatestBlock() (*meshdb.MiniHeader, error) {
return w.stack.Peek()
}

// InspectRetainedBlocks returns the blocks retained in-memory by the Watcher instance. It is not
// particularly performant and therefore should only be used for debugging and testing purposes.
func (w *Watcher) InspectRetainedBlocks() ([]*meshdb.MiniHeader, error) {
Expand Down
5 changes: 5 additions & 0 deletions p2p/node.go
Expand Up @@ -273,6 +273,11 @@ func (n *Node) UnsetPeerScore(id peer.ID, tag string) {
n.connManager.UntagPeer(id, tag)
}

// GetNumPeers returns the number of peers the node is connected to
func (n *Node) GetNumPeers() int {
return n.connManager.GetInfo().ConnCount
}

// Connect ensures there is a connection between this host and the peer with
// given peerInfo. If there is not an active connection, Connect will dial the
// peer, and block until a connection is open, timeout is exceeded, or an error
Expand Down
28 changes: 28 additions & 0 deletions rpc/client.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/0xProject/0x-mesh/zeroex"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
Expand Down Expand Up @@ -66,6 +67,33 @@ func (c *Client) AddPeer(peerInfo peerstore.PeerInfo) error {
return nil
}

// LatestBlock is the latest block processed by the Mesh node
type LatestBlock struct {
Number int `json:"number"`
Hash common.Hash `json:"hash"`
}

// GetStatsResponse is the response returned for an RPC request to mesh_getStats
type GetStatsResponse struct {
Version string `json:"version"`
PubSubTopic string `json:"pubSubTopic"`
Rendezvous string `json:"rendervous"`
PeerID string `json:"peerID"`
EthereumNetworkID int `json:"ethereumNetworkID"`
LatestBlock LatestBlock `json:"latestBlock"`
NumPeers int `json:"numPeers"`
NumOrders int `json:"numOrders"`
}

// GetStats retrieves stats about the Mesh node
func (c *Client) GetStats() (*GetStatsResponse, error) {
var getStatsResponse *GetStatsResponse
if err := c.rpcClient.Call(&getStatsResponse, "mesh_getStats"); err != nil {
return nil, err
}
return getStatsResponse, nil
}

// SubscribeToOrders subscribes a stream of order events
// Note copied from `go-ethereum` codebase: Slow subscribers will be dropped eventually. Client
// buffers up to 8000 notifications before considering the subscriber dead. The subscription Err
Expand Down
45 changes: 45 additions & 0 deletions rpc/client_server_test.go
Expand Up @@ -29,6 +29,7 @@ type dummyRPCHandler struct {
addOrdersHandler func(signedOrdersRaw []*json.RawMessage) (*zeroex.ValidationResults, error)
getOrdersHandler func(page, perPage int, snapshotID string) (*GetOrdersResponse, error)
addPeerHandler func(peerInfo peerstore.PeerInfo) error
getStatsHandler func() (*GetStatsResponse, error)
subscribeToOrdersHandler func(ctx context.Context) (*rpc.Subscription, error)
}

Expand All @@ -53,6 +54,13 @@ func (d *dummyRPCHandler) AddPeer(peerInfo peerstore.PeerInfo) error {
return d.addPeerHandler(peerInfo)
}

func (d *dummyRPCHandler) GetStats() (*GetStatsResponse, error) {
if d.getStatsHandler == nil {
return nil, errors.New("dummyRPCHandler: no handler set for GetStats")
}
return d.getStatsHandler()
}

func (d *dummyRPCHandler) SubscribeToOrders(ctx context.Context) (*rpc.Subscription, error) {
if d.subscribeToOrdersHandler == nil {
return nil, errors.New("dummyRPCHandler: no handler set for Orders")
Expand Down Expand Up @@ -249,6 +257,43 @@ func TestAddPeer(t *testing.T) {
wg.Wait()
}

func TestGetStats(t *testing.T) {
expectedGetStatsResponse := &GetStatsResponse{
Version: "development",
PubSubTopic: "/0x-orders/network/development/version/1",
Rendezvous: "/0x-mesh/network/development/version/1",
PeerID: "16Uiu2HAmJ827EAibLvJxGMj6BvT1tr2e2ssW4cMtpP15qoQqZGSA",
EthereumNetworkID: 42,
LatestBlock: LatestBlock{
Number: 1,
Hash: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"),
},
NumOrders: 0,
NumPeers: 0,
}

// Set up the dummy handler with a getStatsHandler
wg := &sync.WaitGroup{}
wg.Add(1)
rpcHandler := &dummyRPCHandler{
getStatsHandler: func() (*GetStatsResponse, error) {
wg.Done()
return expectedGetStatsResponse, nil
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, client := newTestServerAndClient(t, rpcHandler, ctx)

getStatsResponse, err := client.GetStats()
require.NoError(t, err)
require.Equal(t, expectedGetStatsResponse, getStatsResponse)

// The WaitGroup signals that GetStats was called on the server-side.
wg.Wait()
}

func TestOrdersSubscription(t *testing.T) {
ctx := context.Background()

Expand Down
1 change: 1 addition & 0 deletions rpc/clients/typescript/src/index.ts
Expand Up @@ -13,6 +13,7 @@ export {
RejectedOrderInfo,
ValidationResults,
GetOrdersResponse,
GetStatsResponse,
} from './types';
export { SignedOrder } from '@0x/types';
export { BigNumber } from '@0x/utils';
16 changes: 16 additions & 0 deletions rpc/clients/typescript/src/types.ts
Expand Up @@ -174,3 +174,19 @@ export interface WSMessage {
type: string;
utf8Data: string;
}

export interface LatestBlock {
number: number;
hash: string;
}

export interface GetStatsResponse {
Version: string;
PubSubTopic: string;
Rendezvous: string;
PeerID: string;
EthereumNetworkID: number;
LatestBlock: LatestBlock;
NumPeers: number;
NumOrders: number;
}
5 changes: 5 additions & 0 deletions rpc/clients/typescript/src/ws_client.ts
Expand Up @@ -9,6 +9,7 @@ import * as WebSocket from 'websocket';
import {
AcceptedOrderInfo,
GetOrdersResponse,
GetStatsResponse,
HeartbeatEventPayload,
OrderEvent,
OrderEventPayload,
Expand Down Expand Up @@ -112,6 +113,10 @@ export class WSClient {
});
return validationResults;
}
public async GetStatsAsync(): Promise<GetStatsResponse> {
const stats = await this._wsProvider.send('mesh_getStats', []);
return stats;
}
/**
* Get all 0x signed orders currently stored in the Mesh node
* @param perPage number of signedOrders to fetch per paginated request
Expand Down
7 changes: 7 additions & 0 deletions rpc/service.go
Expand Up @@ -33,6 +33,8 @@ type RPCHandler interface {
GetOrders(page, perPage int, snapshotID string) (*GetOrdersResponse, error)
// AddPeer is called when the client sends an AddPeer request.
AddPeer(peerInfo peerstore.PeerInfo) error
// GetStats is called when the client sends an GetStats request.
GetStats() (*GetStatsResponse, error)
// SubscribeToOrders is called when a client sends a Subscribe to `orders` request
SubscribeToOrders(ctx context.Context) (*rpc.Subscription, error)
}
Expand Down Expand Up @@ -150,3 +152,8 @@ func (s *rpcService) AddPeer(peerID string, multiaddrs []string) error {

return s.rpcHandler.AddPeer(peerInfo)
}

// GetStats calls rpcHandler.GetStats. If there is an error, it returns it.
func (s *rpcService) GetStats() (*GetStatsResponse, error) {
return s.rpcHandler.GetStats()
}

0 comments on commit 036ffa6

Please sign in to comment.