From 1c60e1ad0102560232c81e5bd71f0d0c4b2bd3d9 Mon Sep 17 00:00:00 2001 From: fabioberger Date: Tue, 30 Jul 2019 20:57:23 +0200 Subject: [PATCH] Add back mesh_addPeer RPC method --- cmd/add_peer/main.go | 52 +++++++++++++++++++++++++++++++++++++++ cmd/mesh/rpc_handler.go | 11 +++++++++ core/core.go | 6 +++++ rpc/client.go | 16 ++++++++++++ rpc/client_server_test.go | 45 +++++++++++++++++++++++++++++++++ rpc/service.go | 31 +++++++++++++++++++++++ 6 files changed, 161 insertions(+) create mode 100644 cmd/add_peer/main.go diff --git a/cmd/add_peer/main.go b/cmd/add_peer/main.go new file mode 100644 index 000000000..9c17ebaa5 --- /dev/null +++ b/cmd/add_peer/main.go @@ -0,0 +1,52 @@ +// demo/add_peer is a short program that adds a new peer to 0x Mesh via RPC. +package main + +import ( + "github.com/0xProject/0x-mesh/rpc" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" + "github.com/plaid/go-envvar/envvar" + log "github.com/sirupsen/logrus" +) + +type clientEnvVars struct { + // RPCAddress is the address of the 0x Mesh node to communicate with. + RPCAddress string `envvar:"RPC_ADDRESS"` + // PeerID is the base58-encoded peer ID of the peer to connect to. + PeerID string `envvar:"PEER_ID"` + // PeerAddr is the Multiaddress of the peer to connect to. + PeerAddr string `envvar:"PEER_ADDR"` +} + +func main() { + env := clientEnvVars{} + if err := envvar.Parse(&env); err != nil { + panic(err) + } + + // Parse peer ID and peer address. + parsedPeerID, err := peer.IDB58Decode(env.PeerID) + if err != nil { + log.Fatal(err) + } + parsedMultiAddr, err := ma.NewMultiaddr(env.PeerAddr) + if err != nil { + log.Fatal(err) + } + peerInfo := peerstore.PeerInfo{ + ID: parsedPeerID, + Addrs: []ma.Multiaddr{parsedMultiAddr}, + } + + client, err := rpc.NewClient(env.RPCAddress) + if err != nil { + log.WithError(err).Fatal("could not create client") + } + + if err := client.AddPeer(peerInfo); err != nil { + log.WithError(err).Fatal("error from AddPeer") + } else { + log.Printf("successfully added peer: %s", env.PeerID) + } +} diff --git a/cmd/mesh/rpc_handler.go b/cmd/mesh/rpc_handler.go index 73ee61191..ee629538a 100644 --- a/cmd/mesh/rpc_handler.go +++ b/cmd/mesh/rpc_handler.go @@ -15,6 +15,7 @@ import ( "github.com/0xProject/0x-mesh/rpc" "github.com/0xProject/0x-mesh/zeroex" ethRpc "github.com/ethereum/go-ethereum/rpc" + peerstore "github.com/libp2p/go-libp2p-peerstore" log "github.com/sirupsen/logrus" ) @@ -80,6 +81,16 @@ func (handler *rpcHandler) AddOrders(signedOrdersRaw []*json.RawMessage) (*zeroe return validationResults, nil } +// AddPeer is called when an RPC client calls AddPeer, +func (handler *rpcHandler) AddPeer(peerInfo peerstore.PeerInfo) error { + log.Debug("received AddPeer request via RPC") + if err := handler.app.AddPeer(peerInfo); err != nil { + log.WithField("error", err.Error()).Error("internal error in AddPeer RPC call") + return constants.ErrInternal + } + return nil +} + // GetStats is called when an RPC client calls GetStats, func (handler *rpcHandler) GetStats() (*rpc.GetStatsResponse, error) { log.Debug("received GetStats request via RPC") diff --git a/core/core.go b/core/core.go index 87f769473..9c185d73f 100644 --- a/core/core.go +++ b/core/core.go @@ -31,6 +31,7 @@ import ( "github.com/google/uuid" p2pcrypto "github.com/libp2p/go-libp2p-crypto" peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" ma "github.com/multiformats/go-multiaddr" log "github.com/sirupsen/logrus" "github.com/xeipuuv/gojsonschema" @@ -604,6 +605,11 @@ func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage) (*zeroex.Validatio return allValidationResults, nil } +// AddPeer can be used to manually connect to a new peer. +func (app *App) AddPeer(peerInfo peerstore.PeerInfo) error { + return app.node.Connect(peerInfo, peerConnectTimeout) +} + // GetStats retrieves stats about the Mesh node func (app *App) GetStats() (*rpc.GetStatsResponse, error) { latestBlockHeader, err := app.blockWatcher.GetLatestBlock() diff --git a/rpc/client.go b/rpc/client.go index 102dc8f7b..883a03831 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -6,6 +6,8 @@ import ( "github.com/0xProject/0x-mesh/zeroex" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" ) // Client is a JSON RPC 2.0 client implementation over WebSockets. It can be @@ -51,6 +53,20 @@ func (c *Client) GetOrders(page, perPage int, snapshotID string) (*GetOrdersResp return &getOrdersResponse, nil } +// AddPeer adds the peer to the node's list of peers. The node will attempt to +// connect to this new peer and return an error if it cannot. +func (c *Client) AddPeer(peerInfo peerstore.PeerInfo) error { + peerIDString := peer.IDB58Encode(peerInfo.ID) + multiAddrStrings := make([]string, len(peerInfo.Addrs)) + for i, addr := range peerInfo.Addrs { + multiAddrStrings[i] = addr.String() + } + if err := c.rpcClient.Call(nil, "mesh_addPeer", peerIDString, multiAddrStrings); err != nil { + return err + } + return nil +} + // LatestBlock is the latest block processed by the Mesh node type LatestBlock struct { Number int `json:"number"` diff --git a/rpc/client_server_test.go b/rpc/client_server_test.go index 31f758358..3762d6911 100644 --- a/rpc/client_server_test.go +++ b/rpc/client_server_test.go @@ -16,6 +16,9 @@ import ( "github.com/0xProject/0x-mesh/zeroex" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -25,6 +28,7 @@ import ( type dummyRPCHandler struct { addOrdersHandler func(signedOrdersRaw []*json.RawMessage) (*zeroex.ValidationResults, error) getOrdersHandler func(page, perPage int, snapshotID string) (*GetOrdersResponse, error) + addPeerHandler func(peerInfo peerstore.PeerInfo) error getStatsHandler func() (*GetStatsResponse, error) subscribeToOrdersHandler func(ctx context.Context) (*rpc.Subscription, error) } @@ -43,6 +47,13 @@ func (d *dummyRPCHandler) GetOrders(page, perPage int, snapshotID string) (*GetO return d.getOrdersHandler(page, perPage, snapshotID) } +func (d *dummyRPCHandler) AddPeer(peerInfo peerstore.PeerInfo) error { + if d.addPeerHandler == nil { + return errors.New("dummyRPCHandler: no handler set for AddPeer") + } + return d.addPeerHandler(peerInfo) +} + func (d *dummyRPCHandler) GetStats() (*GetStatsResponse, error) { if d.getStatsHandler == nil { return nil, errors.New("dummyRPCHandler: no handler set for GetStats") @@ -212,6 +223,40 @@ func TestGetOrdersSuccess(t *testing.T) { wg.Wait() } +func TestAddPeer(t *testing.T) { + // Create the expected PeerInfo + addr0, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234") + require.NoError(t, err) + addr1, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678") + require.NoError(t, err) + peerID, err := peer.IDB58Decode("QmagLpXZHNrTraqWpY49xtFmZMTLBWctx2PF96s4aFrj9f") + require.NoError(t, err) + expectedPeerInfo := peerstore.PeerInfo{ + ID: peerID, + Addrs: []ma.Multiaddr{addr0, addr1}, + } + + // Set up the dummy handler with an addPeerHandler + wg := &sync.WaitGroup{} + wg.Add(1) + rpcHandler := &dummyRPCHandler{ + addPeerHandler: func(peerInfo peerstore.PeerInfo) error { + assert.Equal(t, expectedPeerInfo, peerInfo, "AddPeer was called with an unexpected peerInfo argument") + wg.Done() + return nil + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, client := newTestServerAndClient(t, rpcHandler, ctx) + + require.NoError(t, client.AddPeer(expectedPeerInfo)) + + // The WaitGroup signals that AddPeer was called on the server-side. + wg.Wait() +} + func TestGetStats(t *testing.T) { expectedGetStatsResponse := &GetStatsResponse{ Version: "development", diff --git a/rpc/service.go b/rpc/service.go index fa5c15435..984d95436 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -11,6 +11,9 @@ import ( "github.com/0xProject/0x-mesh/zeroex" "github.com/ethereum/go-ethereum/rpc" ethRpc "github.com/ethereum/go-ethereum/rpc" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" log "github.com/sirupsen/logrus" ) @@ -28,6 +31,8 @@ type RPCHandler interface { AddOrders(signedOrdersRaw []*json.RawMessage) (*zeroex.ValidationResults, error) // GetOrders is called when the clients sends a GetOrders request GetOrders(page, perPage int, snapshotID string) (*GetOrdersResponse, error) + // AddPeer is called when the client sends an AddPeer request. + AddPeer(peerInfo peerstore.PeerInfo) error // GetStats is called when the client sends an GetStats request. GetStats() (*GetStatsResponse, error) // SubscribeToOrders is called when a client sends a Subscribe to `orders` request @@ -122,6 +127,32 @@ func (s *rpcService) GetOrders(page, perPage int, snapshotID string) (*GetOrders return s.rpcHandler.GetOrders(page, perPage, snapshotID) } +// AddPeer builds PeerInfo out of the given peer ID and multiaddresses and +// calls rpcHandler.AddPeer. If there is an error, it returns it. +func (s *rpcService) AddPeer(peerID string, multiaddrs []string) error { + // Parse peer ID. + parsedPeerID, err := peer.IDB58Decode(peerID) + if err != nil { + return err + } + peerInfo := peerstore.PeerInfo{ + ID: parsedPeerID, + } + + // Parse each given multiaddress. + parsedMultiaddrs := make([]ma.Multiaddr, len(multiaddrs)) + for i, addr := range multiaddrs { + parsed, err := ma.NewMultiaddr(addr) + if err != nil { + return err + } + parsedMultiaddrs[i] = parsed + } + peerInfo.Addrs = parsedMultiaddrs + + return s.rpcHandler.AddPeer(peerInfo) +} + // GetStats calls rpcHandler.GetStats. If there is an error, it returns it. func (s *rpcService) GetStats() (*GetStatsResponse, error) { return s.rpcHandler.GetStats()