From 199f6a795c6dbdbdca9d70474f2b8a425a0c3e53 Mon Sep 17 00:00:00 2001 From: fabioberger Date: Tue, 30 Jul 2019 18:37:02 +0200 Subject: [PATCH 1/4] Replace mesh_addPeer with mesh_getStats endpoint --- cmd/demo/add_peer/main.go | 52 ---------------------------- cmd/mesh/rpc_handler.go | 16 ++++----- core/core.go | 36 +++++++++++++++---- ethereum/blockwatch/block_watcher.go | 5 +++ p2p/node.go | 5 +++ rpc/client.go | 38 +++++++++++++------- rpc/client_server_test.go | 50 +++++++++++++------------- rpc/service.go | 34 +++--------------- 8 files changed, 103 insertions(+), 133 deletions(-) delete mode 100644 cmd/demo/add_peer/main.go diff --git a/cmd/demo/add_peer/main.go b/cmd/demo/add_peer/main.go deleted file mode 100644 index 9c17ebaa5..000000000 --- a/cmd/demo/add_peer/main.go +++ /dev/null @@ -1,52 +0,0 @@ -// demo/add_peer is a short program that adds a new peer to 0x Mesh via RPC. -package main - -import ( - "github.com/0xProject/0x-mesh/rpc" - peer "github.com/libp2p/go-libp2p-peer" - peerstore "github.com/libp2p/go-libp2p-peerstore" - ma "github.com/multiformats/go-multiaddr" - "github.com/plaid/go-envvar/envvar" - log "github.com/sirupsen/logrus" -) - -type clientEnvVars struct { - // RPCAddress is the address of the 0x Mesh node to communicate with. - RPCAddress string `envvar:"RPC_ADDRESS"` - // PeerID is the base58-encoded peer ID of the peer to connect to. - PeerID string `envvar:"PEER_ID"` - // PeerAddr is the Multiaddress of the peer to connect to. - PeerAddr string `envvar:"PEER_ADDR"` -} - -func main() { - env := clientEnvVars{} - if err := envvar.Parse(&env); err != nil { - panic(err) - } - - // Parse peer ID and peer address. - parsedPeerID, err := peer.IDB58Decode(env.PeerID) - if err != nil { - log.Fatal(err) - } - parsedMultiAddr, err := ma.NewMultiaddr(env.PeerAddr) - if err != nil { - log.Fatal(err) - } - peerInfo := peerstore.PeerInfo{ - ID: parsedPeerID, - Addrs: []ma.Multiaddr{parsedMultiAddr}, - } - - client, err := rpc.NewClient(env.RPCAddress) - if err != nil { - log.WithError(err).Fatal("could not create client") - } - - if err := client.AddPeer(peerInfo); err != nil { - log.WithError(err).Fatal("error from AddPeer") - } else { - log.Printf("successfully added peer: %s", env.PeerID) - } -} diff --git a/cmd/mesh/rpc_handler.go b/cmd/mesh/rpc_handler.go index 80e008865..73ee61191 100644 --- a/cmd/mesh/rpc_handler.go +++ b/cmd/mesh/rpc_handler.go @@ -15,7 +15,6 @@ import ( "github.com/0xProject/0x-mesh/rpc" "github.com/0xProject/0x-mesh/zeroex" ethRpc "github.com/ethereum/go-ethereum/rpc" - peerstore "github.com/libp2p/go-libp2p-peerstore" log "github.com/sirupsen/logrus" ) @@ -81,14 +80,15 @@ func (handler *rpcHandler) AddOrders(signedOrdersRaw []*json.RawMessage) (*zeroe return validationResults, nil } -// AddPeer is called when an RPC client calls AddPeer, -func (handler *rpcHandler) AddPeer(peerInfo peerstore.PeerInfo) error { - log.Debug("received AddPeer request via RPC") - if err := handler.app.AddPeer(peerInfo); err != nil { - log.WithField("error", err.Error()).Error("internal error in AddPeer RPC call") - return constants.ErrInternal +// GetStats is called when an RPC client calls GetStats, +func (handler *rpcHandler) GetStats() (*rpc.GetStatsResponse, error) { + log.Debug("received GetStats request via RPC") + getStatsResponse, err := handler.app.GetStats() + if err != nil { + log.WithField("error", err.Error()).Error("internal error in GetStats RPC call") + return nil, constants.ErrInternal } - return nil + return getStatsResponse, nil } // SubscribeToOrders is called when an RPC client sends a `mesh_subscribe` request with the `orders` topic parameter diff --git a/core/core.go b/core/core.go index 7e1a63399..87f769473 100644 --- a/core/core.go +++ b/core/core.go @@ -1,6 +1,6 @@ // +build !js -// package core contains everything needed to configure and run a 0x Mesh node. +// Package core contains everything needed to configure and run a 0x Mesh node. package core import ( @@ -31,7 +31,6 @@ import ( "github.com/google/uuid" p2pcrypto "github.com/libp2p/go-libp2p-crypto" peer "github.com/libp2p/go-libp2p-peer" - peerstore "github.com/libp2p/go-libp2p-peerstore" ma "github.com/multiformats/go-multiaddr" log "github.com/sirupsen/logrus" "github.com/xeipuuv/gojsonschema" @@ -44,6 +43,7 @@ const ( peerConnectTimeout = 60 * time.Second checkNewAddrInterval = 20 * time.Second expirationPollingInterval = 50 * time.Millisecond + version = "development" ) // Config is a set of configuration options for 0x Mesh. @@ -210,7 +210,7 @@ func New(config Config) (*App, error) { log.WithFields(map[string]interface{}{ "config": config, - "version": "development", + "version": version, }).Info("finished initializing core.App") return app, nil @@ -604,9 +604,33 @@ func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage) (*zeroex.Validatio return allValidationResults, nil } -// AddPeer can be used to manually connect to a new peer. -func (app *App) AddPeer(peerInfo peerstore.PeerInfo) error { - return app.node.Connect(peerInfo, peerConnectTimeout) +// GetStats retrieves stats about the Mesh node +func (app *App) GetStats() (*rpc.GetStatsResponse, error) { + latestBlockHeader, err := app.blockWatcher.GetLatestBlock() + if err != nil { + return nil, err + } + latestBlock := rpc.LatestBlock{ + Number: int(latestBlockHeader.Number.Int64()), + Hash: latestBlockHeader.Hash, + } + notRemovedFilter := app.db.Orders.IsRemovedIndex.ValueFilter([]byte{0}) + numOrders, err := app.db.Orders.NewQuery(notRemovedFilter).Count() + if err != nil { + return nil, err + } + + response := &rpc.GetStatsResponse{ + Version: version, + PubSubTopic: getPubSubTopic(app.config.EthereumNetworkID), + Rendezvous: getRendezvous(app.config.EthereumNetworkID), + PeerID: app.peerID.String(), + EthereumNetworkID: app.config.EthereumNetworkID, + LatestBlock: latestBlock, + NumOrders: numOrders, + NumPeers: app.node.GetNumPeers(), + } + return response, nil } // SubscribeToOrderEvents let's one subscribe to order events emitted by the OrderWatcher diff --git a/ethereum/blockwatch/block_watcher.go b/ethereum/blockwatch/block_watcher.go index b8fdcc255..781cb4521 100644 --- a/ethereum/blockwatch/block_watcher.go +++ b/ethereum/blockwatch/block_watcher.go @@ -133,6 +133,11 @@ func (w *Watcher) Subscribe(sink chan<- []*Event) event.Subscription { return w.blockScope.Track(w.blockFeed.Subscribe(sink)) } +// GetLatestBlock returns the latest block processed +func (w *Watcher) GetLatestBlock() (*meshdb.MiniHeader, error) { + return w.stack.Peek() +} + // InspectRetainedBlocks returns the blocks retained in-memory by the Watcher instance. It is not // particularly performant and therefore should only be used for debugging and testing purposes. func (w *Watcher) InspectRetainedBlocks() ([]*meshdb.MiniHeader, error) { diff --git a/p2p/node.go b/p2p/node.go index 15e788f45..01bd7eb68 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -273,6 +273,11 @@ func (n *Node) UnsetPeerScore(id peer.ID, tag string) { n.connManager.UntagPeer(id, tag) } +// GetNumPeers returns the number of peers the node is connected to +func (n *Node) GetNumPeers() int { + return n.connManager.GetInfo().ConnCount +} + // Connect ensures there is a connection between this host and the peer with // given peerInfo. If there is not an active connection, Connect will dial the // peer, and block until a connection is open, timeout is exceeded, or an error diff --git a/rpc/client.go b/rpc/client.go index 7a5b5fb86..102dc8f7b 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -4,9 +4,8 @@ import ( "context" "github.com/0xProject/0x-mesh/zeroex" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" - peer "github.com/libp2p/go-libp2p-peer" - peerstore "github.com/libp2p/go-libp2p-peerstore" ) // Client is a JSON RPC 2.0 client implementation over WebSockets. It can be @@ -52,18 +51,31 @@ func (c *Client) GetOrders(page, perPage int, snapshotID string) (*GetOrdersResp return &getOrdersResponse, nil } -// AddPeer adds the peer to the node's list of peers. The node will attempt to -// connect to this new peer and return an error if it cannot. -func (c *Client) AddPeer(peerInfo peerstore.PeerInfo) error { - peerIDString := peer.IDB58Encode(peerInfo.ID) - multiAddrStrings := make([]string, len(peerInfo.Addrs)) - for i, addr := range peerInfo.Addrs { - multiAddrStrings[i] = addr.String() - } - if err := c.rpcClient.Call(nil, "mesh_addPeer", peerIDString, multiAddrStrings); err != nil { - return err +// LatestBlock is the latest block processed by the Mesh node +type LatestBlock struct { + Number int `json:"number"` + Hash common.Hash `json:"hash"` +} + +// GetStatsResponse is the response returned for an RPC request to mesh_getStats +type GetStatsResponse struct { + Version string `json:"version"` + PubSubTopic string `json:"pubSubTopic"` + Rendezvous string `json:"rendervous"` + PeerID string `json:"peerID"` + EthereumNetworkID int `json:"ethereumNetworkID"` + LatestBlock LatestBlock `json:"latestBlock"` + NumPeers int `json:"numPeers"` + NumOrders int `json:"numOrders"` +} + +// GetStats retrieves stats about the Mesh node +func (c *Client) GetStats() (*GetStatsResponse, error) { + var getStatsResponse *GetStatsResponse + if err := c.rpcClient.Call(&getStatsResponse, "mesh_getStats"); err != nil { + return nil, err } - return nil + return getStatsResponse, nil } // SubscribeToOrders subscribes a stream of order events diff --git a/rpc/client_server_test.go b/rpc/client_server_test.go index d2c96aed0..31f758358 100644 --- a/rpc/client_server_test.go +++ b/rpc/client_server_test.go @@ -16,9 +16,6 @@ import ( "github.com/0xProject/0x-mesh/zeroex" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" - peer "github.com/libp2p/go-libp2p-peer" - peerstore "github.com/libp2p/go-libp2p-peerstore" - ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -28,7 +25,7 @@ import ( type dummyRPCHandler struct { addOrdersHandler func(signedOrdersRaw []*json.RawMessage) (*zeroex.ValidationResults, error) getOrdersHandler func(page, perPage int, snapshotID string) (*GetOrdersResponse, error) - addPeerHandler func(peerInfo peerstore.PeerInfo) error + getStatsHandler func() (*GetStatsResponse, error) subscribeToOrdersHandler func(ctx context.Context) (*rpc.Subscription, error) } @@ -46,11 +43,11 @@ func (d *dummyRPCHandler) GetOrders(page, perPage int, snapshotID string) (*GetO return d.getOrdersHandler(page, perPage, snapshotID) } -func (d *dummyRPCHandler) AddPeer(peerInfo peerstore.PeerInfo) error { - if d.addPeerHandler == nil { - return errors.New("dummyRPCHandler: no handler set for AddPeer") +func (d *dummyRPCHandler) GetStats() (*GetStatsResponse, error) { + if d.getStatsHandler == nil { + return nil, errors.New("dummyRPCHandler: no handler set for GetStats") } - return d.addPeerHandler(peerInfo) + return d.getStatsHandler() } func (d *dummyRPCHandler) SubscribeToOrders(ctx context.Context) (*rpc.Subscription, error) { @@ -215,27 +212,28 @@ func TestGetOrdersSuccess(t *testing.T) { wg.Wait() } -func TestAddPeer(t *testing.T) { - // Create the expected PeerInfo - addr0, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234") - require.NoError(t, err) - addr1, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678") - require.NoError(t, err) - peerID, err := peer.IDB58Decode("QmagLpXZHNrTraqWpY49xtFmZMTLBWctx2PF96s4aFrj9f") - require.NoError(t, err) - expectedPeerInfo := peerstore.PeerInfo{ - ID: peerID, - Addrs: []ma.Multiaddr{addr0, addr1}, +func TestGetStats(t *testing.T) { + expectedGetStatsResponse := &GetStatsResponse{ + Version: "development", + PubSubTopic: "/0x-orders/network/development/version/1", + Rendezvous: "/0x-mesh/network/development/version/1", + PeerID: "16Uiu2HAmJ827EAibLvJxGMj6BvT1tr2e2ssW4cMtpP15qoQqZGSA", + EthereumNetworkID: 42, + LatestBlock: LatestBlock{ + Number: 1, + Hash: common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"), + }, + NumOrders: 0, + NumPeers: 0, } - // Set up the dummy handler with an addPeerHandler + // Set up the dummy handler with a getStatsHandler wg := &sync.WaitGroup{} wg.Add(1) rpcHandler := &dummyRPCHandler{ - addPeerHandler: func(peerInfo peerstore.PeerInfo) error { - assert.Equal(t, expectedPeerInfo, peerInfo, "AddPeer was called with an unexpected peerInfo argument") + getStatsHandler: func() (*GetStatsResponse, error) { wg.Done() - return nil + return expectedGetStatsResponse, nil }, } @@ -243,9 +241,11 @@ func TestAddPeer(t *testing.T) { defer cancel() _, client := newTestServerAndClient(t, rpcHandler, ctx) - require.NoError(t, client.AddPeer(expectedPeerInfo)) + getStatsResponse, err := client.GetStats() + require.NoError(t, err) + require.Equal(t, expectedGetStatsResponse, getStatsResponse) - // The WaitGroup signals that AddPeer was called on the server-side. + // The WaitGroup signals that GetStats was called on the server-side. wg.Wait() } diff --git a/rpc/service.go b/rpc/service.go index 63f1312a4..fa5c15435 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -11,9 +11,6 @@ import ( "github.com/0xProject/0x-mesh/zeroex" "github.com/ethereum/go-ethereum/rpc" ethRpc "github.com/ethereum/go-ethereum/rpc" - peer "github.com/libp2p/go-libp2p-peer" - peerstore "github.com/libp2p/go-libp2p-peerstore" - ma "github.com/multiformats/go-multiaddr" log "github.com/sirupsen/logrus" ) @@ -31,8 +28,8 @@ type RPCHandler interface { AddOrders(signedOrdersRaw []*json.RawMessage) (*zeroex.ValidationResults, error) // GetOrders is called when the clients sends a GetOrders request GetOrders(page, perPage int, snapshotID string) (*GetOrdersResponse, error) - // AddPeer is called when the client sends an AddPeer request. - AddPeer(peerInfo peerstore.PeerInfo) error + // GetStats is called when the client sends an GetStats request. + GetStats() (*GetStatsResponse, error) // SubscribeToOrders is called when a client sends a Subscribe to `orders` request SubscribeToOrders(ctx context.Context) (*rpc.Subscription, error) } @@ -125,28 +122,7 @@ func (s *rpcService) GetOrders(page, perPage int, snapshotID string) (*GetOrders return s.rpcHandler.GetOrders(page, perPage, snapshotID) } -// AddPeer builds PeerInfo out of the given peer ID and multiaddresses and -// calls rpcHandler.AddPeer. If there is an error, it returns it. -func (s *rpcService) AddPeer(peerID string, multiaddrs []string) error { - // Parse peer ID. - parsedPeerID, err := peer.IDB58Decode(peerID) - if err != nil { - return err - } - peerInfo := peerstore.PeerInfo{ - ID: parsedPeerID, - } - - // Parse each given multiaddress. - parsedMultiaddrs := make([]ma.Multiaddr, len(multiaddrs)) - for i, addr := range multiaddrs { - parsed, err := ma.NewMultiaddr(addr) - if err != nil { - return err - } - parsedMultiaddrs[i] = parsed - } - peerInfo.Addrs = parsedMultiaddrs - - return s.rpcHandler.AddPeer(peerInfo) +// GetStats calls rpcHandler.GetStats. If there is an error, it returns it. +func (s *rpcService) GetStats() (*GetStatsResponse, error) { + return s.rpcHandler.GetStats() } From c5509bf5e3fb1efd74bd72086c371ec6063b4655 Mon Sep 17 00:00:00 2001 From: fabioberger Date: Tue, 30 Jul 2019 18:41:24 +0200 Subject: [PATCH 2/4] Add GetStats method to TS client --- rpc/clients/typescript/src/index.ts | 1 + rpc/clients/typescript/src/types.ts | 16 ++++++++++++++++ rpc/clients/typescript/src/ws_client.ts | 5 +++++ 3 files changed, 22 insertions(+) diff --git a/rpc/clients/typescript/src/index.ts b/rpc/clients/typescript/src/index.ts index 548243696..f64224fb4 100644 --- a/rpc/clients/typescript/src/index.ts +++ b/rpc/clients/typescript/src/index.ts @@ -13,6 +13,7 @@ export { RejectedOrderInfo, ValidationResults, GetOrdersResponse, + GetStatsResponse, } from './types'; export { SignedOrder } from '@0x/types'; export { BigNumber } from '@0x/utils'; diff --git a/rpc/clients/typescript/src/types.ts b/rpc/clients/typescript/src/types.ts index d488c3ad5..8097aa6ee 100644 --- a/rpc/clients/typescript/src/types.ts +++ b/rpc/clients/typescript/src/types.ts @@ -174,3 +174,19 @@ export interface WSMessage { type: string; utf8Data: string; } + +export interface LatestBlock { + number: number; + hash: string; +} + +export interface GetStatsResponse { + Version: string; + PubSubTopic: string; + Rendezvous: string; + PeerID: string; + EthereumNetworkID: number; + LatestBlock: LatestBlock; + NumPeers: number; + NumOrders: number; +} diff --git a/rpc/clients/typescript/src/ws_client.ts b/rpc/clients/typescript/src/ws_client.ts index 8cbf96324..259d2dd91 100644 --- a/rpc/clients/typescript/src/ws_client.ts +++ b/rpc/clients/typescript/src/ws_client.ts @@ -9,6 +9,7 @@ import * as WebSocket from 'websocket'; import { AcceptedOrderInfo, GetOrdersResponse, + GetStatsResponse, HeartbeatEventPayload, OrderEvent, OrderEventPayload, @@ -112,6 +113,10 @@ export class WSClient { }); return validationResults; } + public async GetStatsAsync(): Promise { + const stats = await this._wsProvider.send('mesh_getStats', []); + return stats; + } /** * Get all 0x signed orders currently stored in the Mesh node * @param perPage number of signedOrders to fetch per paginated request From 89940aff54b50554e17a1c5b54ae25c006148796 Mon Sep 17 00:00:00 2001 From: fabioberger Date: Tue, 30 Jul 2019 20:47:39 +0200 Subject: [PATCH 3/4] Update core version replacement logic and consolidate all regex replacers to use same function --- cmd/cut-release/main.go | 36 +++++++++++++----------------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/cmd/cut-release/main.go b/cmd/cut-release/main.go index 37d195af7..299643bd4 100644 --- a/cmd/cut-release/main.go +++ b/cmd/cut-release/main.go @@ -97,44 +97,34 @@ func updateHardCodedVersions(version string) { // Update `rpc/clients/typescript/package.json` tsClientPackageJSONPath := "rpc/clients/typescript/package.json" - updateVersionKey(tsClientPackageJSONPath, version) + newVersionString := fmt.Sprintf(`"version": "%s"`, version) + regex := `"version": "(.*)"` + updateFileWithRegex(tsClientPackageJSONPath, regex, newVersionString) // Update `core.go` corePath := "core/core.go" - updateVersionKey(corePath, version) + newVersionString = fmt.Sprintf(`version$1= "%s"`, version) + regex = `version(.*)= "(.*)"` + updateFileWithRegex(corePath, regex, newVersionString) // Update badge in README.md pathToMDFilesWithBadges := []string{"README.md", "docs/USAGE.md", "docs/DEVELOPMENT.md", "docs/DEPLOYMENT.md"} + doubleDashVersion := strings.Replace(version, "-", "--", -1) + newSvgName := fmt.Sprintf("version-%s-orange.svg", doubleDashVersion) + regex = `version-(.*)-orange.svg` for _, path := range pathToMDFilesWithBadges { - updateBadge(path, version) + updateFileWithRegex(path, regex, newSvgName) } } -func updateVersionKey(filePath string, version string) { +func updateFileWithRegex(filePath string, regex string, replacement string) { dat, err := ioutil.ReadFile(filePath) if err != nil { log.Fatal(err) } - newVersionString := fmt.Sprintf(`"version": "%s"`, version) - var re = regexp.MustCompile(`"version": "(.*)"`) - modifiedDat := []byte(re.ReplaceAllString(string(dat), newVersionString)) - err = ioutil.WriteFile(filePath, modifiedDat, 0644) - if err != nil { - log.Fatal(err) - } -} - -func updateBadge(filePath string, version string) { - dat, err := ioutil.ReadFile(filePath) - if err != nil { - log.Fatal(err) - } - doubleDashVersion := strings.Replace(version, "-", "--", -1) - newSvgName := fmt.Sprintf("version-%s-orange.svg", doubleDashVersion) - var re = regexp.MustCompile(`version-(.*)-orange.svg`) - modifiedDat := []byte(re.ReplaceAllString(string(dat), newSvgName)) - + var re = regexp.MustCompile(regex) + modifiedDat := []byte(re.ReplaceAllString(string(dat), replacement)) err = ioutil.WriteFile(filePath, modifiedDat, 0644) if err != nil { log.Fatal(err) From 1c60e1ad0102560232c81e5bd71f0d0c4b2bd3d9 Mon Sep 17 00:00:00 2001 From: fabioberger Date: Tue, 30 Jul 2019 20:57:23 +0200 Subject: [PATCH 4/4] Add back mesh_addPeer RPC method --- cmd/add_peer/main.go | 52 +++++++++++++++++++++++++++++++++++++++ cmd/mesh/rpc_handler.go | 11 +++++++++ core/core.go | 6 +++++ rpc/client.go | 16 ++++++++++++ rpc/client_server_test.go | 45 +++++++++++++++++++++++++++++++++ rpc/service.go | 31 +++++++++++++++++++++++ 6 files changed, 161 insertions(+) create mode 100644 cmd/add_peer/main.go diff --git a/cmd/add_peer/main.go b/cmd/add_peer/main.go new file mode 100644 index 000000000..9c17ebaa5 --- /dev/null +++ b/cmd/add_peer/main.go @@ -0,0 +1,52 @@ +// demo/add_peer is a short program that adds a new peer to 0x Mesh via RPC. +package main + +import ( + "github.com/0xProject/0x-mesh/rpc" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" + "github.com/plaid/go-envvar/envvar" + log "github.com/sirupsen/logrus" +) + +type clientEnvVars struct { + // RPCAddress is the address of the 0x Mesh node to communicate with. + RPCAddress string `envvar:"RPC_ADDRESS"` + // PeerID is the base58-encoded peer ID of the peer to connect to. + PeerID string `envvar:"PEER_ID"` + // PeerAddr is the Multiaddress of the peer to connect to. + PeerAddr string `envvar:"PEER_ADDR"` +} + +func main() { + env := clientEnvVars{} + if err := envvar.Parse(&env); err != nil { + panic(err) + } + + // Parse peer ID and peer address. + parsedPeerID, err := peer.IDB58Decode(env.PeerID) + if err != nil { + log.Fatal(err) + } + parsedMultiAddr, err := ma.NewMultiaddr(env.PeerAddr) + if err != nil { + log.Fatal(err) + } + peerInfo := peerstore.PeerInfo{ + ID: parsedPeerID, + Addrs: []ma.Multiaddr{parsedMultiAddr}, + } + + client, err := rpc.NewClient(env.RPCAddress) + if err != nil { + log.WithError(err).Fatal("could not create client") + } + + if err := client.AddPeer(peerInfo); err != nil { + log.WithError(err).Fatal("error from AddPeer") + } else { + log.Printf("successfully added peer: %s", env.PeerID) + } +} diff --git a/cmd/mesh/rpc_handler.go b/cmd/mesh/rpc_handler.go index 73ee61191..ee629538a 100644 --- a/cmd/mesh/rpc_handler.go +++ b/cmd/mesh/rpc_handler.go @@ -15,6 +15,7 @@ import ( "github.com/0xProject/0x-mesh/rpc" "github.com/0xProject/0x-mesh/zeroex" ethRpc "github.com/ethereum/go-ethereum/rpc" + peerstore "github.com/libp2p/go-libp2p-peerstore" log "github.com/sirupsen/logrus" ) @@ -80,6 +81,16 @@ func (handler *rpcHandler) AddOrders(signedOrdersRaw []*json.RawMessage) (*zeroe return validationResults, nil } +// AddPeer is called when an RPC client calls AddPeer, +func (handler *rpcHandler) AddPeer(peerInfo peerstore.PeerInfo) error { + log.Debug("received AddPeer request via RPC") + if err := handler.app.AddPeer(peerInfo); err != nil { + log.WithField("error", err.Error()).Error("internal error in AddPeer RPC call") + return constants.ErrInternal + } + return nil +} + // GetStats is called when an RPC client calls GetStats, func (handler *rpcHandler) GetStats() (*rpc.GetStatsResponse, error) { log.Debug("received GetStats request via RPC") diff --git a/core/core.go b/core/core.go index 87f769473..9c185d73f 100644 --- a/core/core.go +++ b/core/core.go @@ -31,6 +31,7 @@ import ( "github.com/google/uuid" p2pcrypto "github.com/libp2p/go-libp2p-crypto" peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" ma "github.com/multiformats/go-multiaddr" log "github.com/sirupsen/logrus" "github.com/xeipuuv/gojsonschema" @@ -604,6 +605,11 @@ func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage) (*zeroex.Validatio return allValidationResults, nil } +// AddPeer can be used to manually connect to a new peer. +func (app *App) AddPeer(peerInfo peerstore.PeerInfo) error { + return app.node.Connect(peerInfo, peerConnectTimeout) +} + // GetStats retrieves stats about the Mesh node func (app *App) GetStats() (*rpc.GetStatsResponse, error) { latestBlockHeader, err := app.blockWatcher.GetLatestBlock() diff --git a/rpc/client.go b/rpc/client.go index 102dc8f7b..883a03831 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -6,6 +6,8 @@ import ( "github.com/0xProject/0x-mesh/zeroex" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" ) // Client is a JSON RPC 2.0 client implementation over WebSockets. It can be @@ -51,6 +53,20 @@ func (c *Client) GetOrders(page, perPage int, snapshotID string) (*GetOrdersResp return &getOrdersResponse, nil } +// AddPeer adds the peer to the node's list of peers. The node will attempt to +// connect to this new peer and return an error if it cannot. +func (c *Client) AddPeer(peerInfo peerstore.PeerInfo) error { + peerIDString := peer.IDB58Encode(peerInfo.ID) + multiAddrStrings := make([]string, len(peerInfo.Addrs)) + for i, addr := range peerInfo.Addrs { + multiAddrStrings[i] = addr.String() + } + if err := c.rpcClient.Call(nil, "mesh_addPeer", peerIDString, multiAddrStrings); err != nil { + return err + } + return nil +} + // LatestBlock is the latest block processed by the Mesh node type LatestBlock struct { Number int `json:"number"` diff --git a/rpc/client_server_test.go b/rpc/client_server_test.go index 31f758358..3762d6911 100644 --- a/rpc/client_server_test.go +++ b/rpc/client_server_test.go @@ -16,6 +16,9 @@ import ( "github.com/0xProject/0x-mesh/zeroex" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -25,6 +28,7 @@ import ( type dummyRPCHandler struct { addOrdersHandler func(signedOrdersRaw []*json.RawMessage) (*zeroex.ValidationResults, error) getOrdersHandler func(page, perPage int, snapshotID string) (*GetOrdersResponse, error) + addPeerHandler func(peerInfo peerstore.PeerInfo) error getStatsHandler func() (*GetStatsResponse, error) subscribeToOrdersHandler func(ctx context.Context) (*rpc.Subscription, error) } @@ -43,6 +47,13 @@ func (d *dummyRPCHandler) GetOrders(page, perPage int, snapshotID string) (*GetO return d.getOrdersHandler(page, perPage, snapshotID) } +func (d *dummyRPCHandler) AddPeer(peerInfo peerstore.PeerInfo) error { + if d.addPeerHandler == nil { + return errors.New("dummyRPCHandler: no handler set for AddPeer") + } + return d.addPeerHandler(peerInfo) +} + func (d *dummyRPCHandler) GetStats() (*GetStatsResponse, error) { if d.getStatsHandler == nil { return nil, errors.New("dummyRPCHandler: no handler set for GetStats") @@ -212,6 +223,40 @@ func TestGetOrdersSuccess(t *testing.T) { wg.Wait() } +func TestAddPeer(t *testing.T) { + // Create the expected PeerInfo + addr0, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234") + require.NoError(t, err) + addr1, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678") + require.NoError(t, err) + peerID, err := peer.IDB58Decode("QmagLpXZHNrTraqWpY49xtFmZMTLBWctx2PF96s4aFrj9f") + require.NoError(t, err) + expectedPeerInfo := peerstore.PeerInfo{ + ID: peerID, + Addrs: []ma.Multiaddr{addr0, addr1}, + } + + // Set up the dummy handler with an addPeerHandler + wg := &sync.WaitGroup{} + wg.Add(1) + rpcHandler := &dummyRPCHandler{ + addPeerHandler: func(peerInfo peerstore.PeerInfo) error { + assert.Equal(t, expectedPeerInfo, peerInfo, "AddPeer was called with an unexpected peerInfo argument") + wg.Done() + return nil + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, client := newTestServerAndClient(t, rpcHandler, ctx) + + require.NoError(t, client.AddPeer(expectedPeerInfo)) + + // The WaitGroup signals that AddPeer was called on the server-side. + wg.Wait() +} + func TestGetStats(t *testing.T) { expectedGetStatsResponse := &GetStatsResponse{ Version: "development", diff --git a/rpc/service.go b/rpc/service.go index fa5c15435..984d95436 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -11,6 +11,9 @@ import ( "github.com/0xProject/0x-mesh/zeroex" "github.com/ethereum/go-ethereum/rpc" ethRpc "github.com/ethereum/go-ethereum/rpc" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" log "github.com/sirupsen/logrus" ) @@ -28,6 +31,8 @@ type RPCHandler interface { AddOrders(signedOrdersRaw []*json.RawMessage) (*zeroex.ValidationResults, error) // GetOrders is called when the clients sends a GetOrders request GetOrders(page, perPage int, snapshotID string) (*GetOrdersResponse, error) + // AddPeer is called when the client sends an AddPeer request. + AddPeer(peerInfo peerstore.PeerInfo) error // GetStats is called when the client sends an GetStats request. GetStats() (*GetStatsResponse, error) // SubscribeToOrders is called when a client sends a Subscribe to `orders` request @@ -122,6 +127,32 @@ func (s *rpcService) GetOrders(page, perPage int, snapshotID string) (*GetOrders return s.rpcHandler.GetOrders(page, perPage, snapshotID) } +// AddPeer builds PeerInfo out of the given peer ID and multiaddresses and +// calls rpcHandler.AddPeer. If there is an error, it returns it. +func (s *rpcService) AddPeer(peerID string, multiaddrs []string) error { + // Parse peer ID. + parsedPeerID, err := peer.IDB58Decode(peerID) + if err != nil { + return err + } + peerInfo := peerstore.PeerInfo{ + ID: parsedPeerID, + } + + // Parse each given multiaddress. + parsedMultiaddrs := make([]ma.Multiaddr, len(multiaddrs)) + for i, addr := range multiaddrs { + parsed, err := ma.NewMultiaddr(addr) + if err != nil { + return err + } + parsedMultiaddrs[i] = parsed + } + peerInfo.Addrs = parsedMultiaddrs + + return s.rpcHandler.AddPeer(peerInfo) +} + // GetStats calls rpcHandler.GetStats. If there is an error, it returns it. func (s *rpcService) GetStats() (*GetStatsResponse, error) { return s.rpcHandler.GetStats()