Skip to content

Commit

Permalink
Add back mesh_addPeer RPC method
Browse files Browse the repository at this point in the history
  • Loading branch information
fabioberger committed Jul 30, 2019
1 parent 89940af commit 1c60e1a
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 0 deletions.
52 changes: 52 additions & 0 deletions cmd/add_peer/main.go
@@ -0,0 +1,52 @@
// demo/add_peer is a short program that adds a new peer to 0x Mesh via RPC.
package main

import (
"github.com/0xProject/0x-mesh/rpc"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
"github.com/plaid/go-envvar/envvar"
log "github.com/sirupsen/logrus"
)

type clientEnvVars struct {
// RPCAddress is the address of the 0x Mesh node to communicate with.
RPCAddress string `envvar:"RPC_ADDRESS"`
// PeerID is the base58-encoded peer ID of the peer to connect to.
PeerID string `envvar:"PEER_ID"`
// PeerAddr is the Multiaddress of the peer to connect to.
PeerAddr string `envvar:"PEER_ADDR"`
}

func main() {
env := clientEnvVars{}
if err := envvar.Parse(&env); err != nil {
panic(err)
}

// Parse peer ID and peer address.
parsedPeerID, err := peer.IDB58Decode(env.PeerID)
if err != nil {
log.Fatal(err)
}
parsedMultiAddr, err := ma.NewMultiaddr(env.PeerAddr)
if err != nil {
log.Fatal(err)
}
peerInfo := peerstore.PeerInfo{
ID: parsedPeerID,
Addrs: []ma.Multiaddr{parsedMultiAddr},
}

client, err := rpc.NewClient(env.RPCAddress)
if err != nil {
log.WithError(err).Fatal("could not create client")
}

if err := client.AddPeer(peerInfo); err != nil {
log.WithError(err).Fatal("error from AddPeer")
} else {
log.Printf("successfully added peer: %s", env.PeerID)
}
}
11 changes: 11 additions & 0 deletions cmd/mesh/rpc_handler.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/0xProject/0x-mesh/rpc"
"github.com/0xProject/0x-mesh/zeroex"
ethRpc "github.com/ethereum/go-ethereum/rpc"
peerstore "github.com/libp2p/go-libp2p-peerstore"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -80,6 +81,16 @@ func (handler *rpcHandler) AddOrders(signedOrdersRaw []*json.RawMessage) (*zeroe
return validationResults, nil
}

// AddPeer is called when an RPC client calls AddPeer,
func (handler *rpcHandler) AddPeer(peerInfo peerstore.PeerInfo) error {
log.Debug("received AddPeer request via RPC")
if err := handler.app.AddPeer(peerInfo); err != nil {
log.WithField("error", err.Error()).Error("internal error in AddPeer RPC call")
return constants.ErrInternal
}
return nil
}

// GetStats is called when an RPC client calls GetStats,
func (handler *rpcHandler) GetStats() (*rpc.GetStatsResponse, error) {
log.Debug("received GetStats request via RPC")
Expand Down
6 changes: 6 additions & 0 deletions core/core.go
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/google/uuid"
p2pcrypto "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
"github.com/xeipuuv/gojsonschema"
Expand Down Expand Up @@ -604,6 +605,11 @@ func (app *App) AddOrders(signedOrdersRaw []*json.RawMessage) (*zeroex.Validatio
return allValidationResults, nil
}

// AddPeer can be used to manually connect to a new peer.
func (app *App) AddPeer(peerInfo peerstore.PeerInfo) error {
return app.node.Connect(peerInfo, peerConnectTimeout)
}

// GetStats retrieves stats about the Mesh node
func (app *App) GetStats() (*rpc.GetStatsResponse, error) {
latestBlockHeader, err := app.blockWatcher.GetLatestBlock()
Expand Down
16 changes: 16 additions & 0 deletions rpc/client.go
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/0xProject/0x-mesh/zeroex"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
)

// Client is a JSON RPC 2.0 client implementation over WebSockets. It can be
Expand Down Expand Up @@ -51,6 +53,20 @@ func (c *Client) GetOrders(page, perPage int, snapshotID string) (*GetOrdersResp
return &getOrdersResponse, nil
}

// AddPeer adds the peer to the node's list of peers. The node will attempt to
// connect to this new peer and return an error if it cannot.
func (c *Client) AddPeer(peerInfo peerstore.PeerInfo) error {
peerIDString := peer.IDB58Encode(peerInfo.ID)
multiAddrStrings := make([]string, len(peerInfo.Addrs))
for i, addr := range peerInfo.Addrs {
multiAddrStrings[i] = addr.String()
}
if err := c.rpcClient.Call(nil, "mesh_addPeer", peerIDString, multiAddrStrings); err != nil {
return err
}
return nil
}

// LatestBlock is the latest block processed by the Mesh node
type LatestBlock struct {
Number int `json:"number"`
Expand Down
45 changes: 45 additions & 0 deletions rpc/client_server_test.go
Expand Up @@ -16,6 +16,9 @@ import (
"github.com/0xProject/0x-mesh/zeroex"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -25,6 +28,7 @@ import (
type dummyRPCHandler struct {
addOrdersHandler func(signedOrdersRaw []*json.RawMessage) (*zeroex.ValidationResults, error)
getOrdersHandler func(page, perPage int, snapshotID string) (*GetOrdersResponse, error)
addPeerHandler func(peerInfo peerstore.PeerInfo) error
getStatsHandler func() (*GetStatsResponse, error)
subscribeToOrdersHandler func(ctx context.Context) (*rpc.Subscription, error)
}
Expand All @@ -43,6 +47,13 @@ func (d *dummyRPCHandler) GetOrders(page, perPage int, snapshotID string) (*GetO
return d.getOrdersHandler(page, perPage, snapshotID)
}

func (d *dummyRPCHandler) AddPeer(peerInfo peerstore.PeerInfo) error {
if d.addPeerHandler == nil {
return errors.New("dummyRPCHandler: no handler set for AddPeer")
}
return d.addPeerHandler(peerInfo)
}

func (d *dummyRPCHandler) GetStats() (*GetStatsResponse, error) {
if d.getStatsHandler == nil {
return nil, errors.New("dummyRPCHandler: no handler set for GetStats")
Expand Down Expand Up @@ -212,6 +223,40 @@ func TestGetOrdersSuccess(t *testing.T) {
wg.Wait()
}

func TestAddPeer(t *testing.T) {
// Create the expected PeerInfo
addr0, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234")
require.NoError(t, err)
addr1, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678")
require.NoError(t, err)
peerID, err := peer.IDB58Decode("QmagLpXZHNrTraqWpY49xtFmZMTLBWctx2PF96s4aFrj9f")
require.NoError(t, err)
expectedPeerInfo := peerstore.PeerInfo{
ID: peerID,
Addrs: []ma.Multiaddr{addr0, addr1},
}

// Set up the dummy handler with an addPeerHandler
wg := &sync.WaitGroup{}
wg.Add(1)
rpcHandler := &dummyRPCHandler{
addPeerHandler: func(peerInfo peerstore.PeerInfo) error {
assert.Equal(t, expectedPeerInfo, peerInfo, "AddPeer was called with an unexpected peerInfo argument")
wg.Done()
return nil
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, client := newTestServerAndClient(t, rpcHandler, ctx)

require.NoError(t, client.AddPeer(expectedPeerInfo))

// The WaitGroup signals that AddPeer was called on the server-side.
wg.Wait()
}

func TestGetStats(t *testing.T) {
expectedGetStatsResponse := &GetStatsResponse{
Version: "development",
Expand Down
31 changes: 31 additions & 0 deletions rpc/service.go
Expand Up @@ -11,6 +11,9 @@ import (
"github.com/0xProject/0x-mesh/zeroex"
"github.com/ethereum/go-ethereum/rpc"
ethRpc "github.com/ethereum/go-ethereum/rpc"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
)

Expand All @@ -28,6 +31,8 @@ type RPCHandler interface {
AddOrders(signedOrdersRaw []*json.RawMessage) (*zeroex.ValidationResults, error)
// GetOrders is called when the clients sends a GetOrders request
GetOrders(page, perPage int, snapshotID string) (*GetOrdersResponse, error)
// AddPeer is called when the client sends an AddPeer request.
AddPeer(peerInfo peerstore.PeerInfo) error
// GetStats is called when the client sends an GetStats request.
GetStats() (*GetStatsResponse, error)
// SubscribeToOrders is called when a client sends a Subscribe to `orders` request
Expand Down Expand Up @@ -122,6 +127,32 @@ func (s *rpcService) GetOrders(page, perPage int, snapshotID string) (*GetOrders
return s.rpcHandler.GetOrders(page, perPage, snapshotID)
}

// AddPeer builds PeerInfo out of the given peer ID and multiaddresses and
// calls rpcHandler.AddPeer. If there is an error, it returns it.
func (s *rpcService) AddPeer(peerID string, multiaddrs []string) error {
// Parse peer ID.
parsedPeerID, err := peer.IDB58Decode(peerID)
if err != nil {
return err
}
peerInfo := peerstore.PeerInfo{
ID: parsedPeerID,
}

// Parse each given multiaddress.
parsedMultiaddrs := make([]ma.Multiaddr, len(multiaddrs))
for i, addr := range multiaddrs {
parsed, err := ma.NewMultiaddr(addr)
if err != nil {
return err
}
parsedMultiaddrs[i] = parsed
}
peerInfo.Addrs = parsedMultiaddrs

return s.rpcHandler.AddPeer(peerInfo)
}

// GetStats calls rpcHandler.GetStats. If there is an error, it returns it.
func (s *rpcService) GetStats() (*GetStatsResponse, error) {
return s.rpcHandler.GetStats()
Expand Down

0 comments on commit 1c60e1a

Please sign in to comment.