Skip to content

Commit

Permalink
[P2P][RAFT] Change format of enterprise bp address
Browse files Browse the repository at this point in the history
- From URL style to multiaddr style, which is also used in p2p module
- Follow up modification of test scripts
  • Loading branch information
hayarobi committed Jul 18, 2019
1 parent 7e0992d commit 1136bc9
Show file tree
Hide file tree
Showing 24 changed files with 340 additions and 249 deletions.
26 changes: 13 additions & 13 deletions consensus/impl/raftv2/cluster_test.go
Expand Up @@ -38,19 +38,19 @@ func init() {
{types.MemberAttr{
ID: 1,
Name: "testm1",
Address: "aergop2p://127.0.0.1:13001",
Address: "/ip4/127.0.0.1/13001",
PeerID: []byte(testPeerID),
}},
{types.MemberAttr{
ID: 2,
Name: "testm2",
Address: "aergop2p://127.0.0.1:13002",
Address: "/ip4/127.0.0.1/tcp/13002",
PeerID: []byte(testPeerID),
}},
{types.MemberAttr{
ID: 3,
Name: "testm3",
Address: "aergop2p://127.0.0.1:13003",
Address: "/ip4/127.0.0.1/tcp/13003",
PeerID: []byte(testPeerID),
}},
}
Expand Down Expand Up @@ -106,16 +106,16 @@ func TestClusterConfChange(t *testing.T) {
Name: "testraft",
/*
BPs: []config.RaftBPConfig{
{"test1", "aergop2p://127.0.0.1:10001", testPeerIDs[0]},
{"test2", "aergop2p://127.0.0.1:10002", testPeerIDs[1]},
{"test3", "aergop2p://127.0.0.1:10003", testPeerIDs[2]},
{"test1", "/ip4/127.0.0.1/tcp/10001", testPeerIDs[0]},
{"test2", "/ip4/127.0.0.1/tcp/10002", testPeerIDs[1]},
{"test3", "/ip4/127.0.0.1/tcp/10003", testPeerIDs[2]},
},*/
}

mbrs := []*types.MemberAttr{
{ID: 0, Name: "test1", Address: "aergop2p://127.0.0.1:10001", PeerID: []byte(testPeerIDs[0])},
{ID: 1, Name: "test2", Address: "aergop2p://127.0.0.1:10002", PeerID: []byte(testPeerIDs[1])},
{ID: 2, Name: "test3", Address: "aergop2p://127.0.0.1:10003", PeerID: []byte(testPeerIDs[2])},
{ID: 0, Name: "test1", Address: "/ip4/127.0.0.1/tcp/10001", PeerID: []byte(testPeerIDs[0])},
{ID: 1, Name: "test2", Address: "/ip4/127.0.0.1/tcp/10002", PeerID: []byte(testPeerIDs[1])},
{ID: 2, Name: "test3", Address: "/ip4/127.0.0.1/tcp/10003", PeerID: []byte(testPeerIDs[2])},
}

cl := NewCluster([]byte("test"), nil, "testraft", 0, nil)
Expand All @@ -132,7 +132,7 @@ func TestClusterConfChange(t *testing.T) {
// normal case
req := &types.MembershipChange{
Type: types.MembershipChangeType_ADD_MEMBER,
Attr: &types.MemberAttr{ID: 3, Name: "test4", Address: "aergop2p://127.0.0.1:10004", PeerID: []byte(testPeerIDs[3])},
Attr: &types.MemberAttr{ID: 3, Name: "test4", Address: "/ip4/127.0.0.1/tcp/10004", PeerID: []byte(testPeerIDs[3])},
}
_, err = cl.makeProposal(req, true)
assert.NoError(t, err)
Expand All @@ -149,21 +149,21 @@ func TestClusterConfChange(t *testing.T) {
// failed case
req = &types.MembershipChange{
Type: types.MembershipChangeType_ADD_MEMBER,
Attr: &types.MemberAttr{Address: "aergop2p://127.0.0.1:10004", PeerID: []byte(testPeerIDs[3])},
Attr: &types.MemberAttr{Address: "/ip4/127.0.0.1/tcp/10004", PeerID: []byte(testPeerIDs[3])},
}
_, err = cl.makeProposal(req, true)
assert.Error(t, err, "no name")

req = &types.MembershipChange{
Type: types.MembershipChangeType_ADD_MEMBER,
Attr: &types.MemberAttr{Name: "test4", Address: "aergop2p://127.0.0.1:10004", PeerID: []byte(testPeerIDs[0])},
Attr: &types.MemberAttr{Name: "test4", Address: "/ip4/127.0.0.1/tcp/10004", PeerID: []byte(testPeerIDs[0])},
}
_, err = cl.makeProposal(req, true)
assert.Error(t, err, "duplicate peerid")

req = &types.MembershipChange{
Type: types.MembershipChangeType_REMOVE_MEMBER,
Attr: &types.MemberAttr{Name: "test4", Address: "aergop2p://127.0.0.1:10004", PeerID: []byte(testPeerIDs[3])},
Attr: &types.MemberAttr{Name: "test4", Address: "/ip4/127.0.0.1/tcp/10004", PeerID: []byte(testPeerIDs[3])},
}
_, err = cl.makeProposal(req, true)
assert.Error(t, err, "no id to remove")
Expand Down
24 changes: 3 additions & 21 deletions consensus/impl/raftv2/config.go
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"github.com/aergoio/aergo/message"
"github.com/aergoio/aergo/types"
"net/url"
"os"
"strings"
"time"
Expand Down Expand Up @@ -108,9 +107,9 @@ func parseBpsToMembers(bps []types.EnterpriseBP) ([]*types.MemberAttr, error) {

mbrs := make([]*types.MemberAttr, bpLen)
for i, bp := range bps {
trimUrl := strings.TrimSpace(bp.Address)
trimmedAddr := strings.TrimSpace(bp.Address)
// TODO when p2p is applied, have to validate peer address
if err := isValidURL(trimUrl, false); err != nil {
if _, err := types.ParseMultiaddrWithResolve(trimmedAddr); err != nil {
return nil, err
}

Expand All @@ -119,7 +118,7 @@ func parseBpsToMembers(bps []types.EnterpriseBP) ([]*types.MemberAttr, error) {
return nil, fmt.Errorf("invalid raft peerID BP[%d]:%s", i, bp.PeerID)
}

mbrs[i] = &types.MemberAttr{Name: bp.Name, Address: trimUrl, PeerID: []byte(peerID)}
mbrs[i] = &types.MemberAttr{Name: bp.Name, Address: trimmedAddr, PeerID: []byte(peerID)}
}

return mbrs, nil
Expand Down Expand Up @@ -155,23 +154,6 @@ func validateTLS(raftCfg *config.RaftConfig) (bool, error) {
return true, nil
}

func isValidURL(urlstr string, useTls bool) error {
var urlobj *url.URL
var err error

if urlobj, err = consensus.ParseToUrl(urlstr); err != nil {
logger.Error().Str("url", urlstr).Err(err).Msg("raft bp urlstr is not vaild form")
return err
}

if useTls && urlobj.Scheme != "https" {
logger.Error().Str("urlstr", urlstr).Msg("raft bp urlstr shoud use https protocol")
return ErrNotHttpsURL
}

return nil
}

func (cl *Cluster) AddInitialMembers(mbrs []*types.MemberAttr) error {
logger.Debug().Msg("add cluster members from config file")
for _, mbrAttr := range mbrs {
Expand Down
4 changes: 2 additions & 2 deletions consensus/impl/raftv2/test/add_member.sh
Expand Up @@ -28,8 +28,8 @@ prevCnt=$(getClusterTotal 10001)
echo "leader=$leader, port=$leaderport, prevTotal=$prevCnt"

# By RPC
#echo "aergocli -p $leaderport cluster add --name \"$addnode\" --address \"aergop2p://127.0.0.1:${httpports[$addnode]}\" --peerid \"${peerids[$addnode]}\""
#aergocli -p $leaderport cluster add --name "$addnode" --address "aergop2p://127.0.0.1:${httpports[$addnode]}" --peerid "${peerids[$addnode]}"
#echo "aergocli -p $leaderport cluster add --name \"$addnode\" --address \"/ip4/127.0.0.1/${httpports[$addnode]}\" --peerid \"${peerids[$addnode]}\""
#aergocli -p $leaderport cluster add --name "$addnode" --address "/ip4/127.0.0.1/${httpports[$addnode]}" --peerid "${peerids[$addnode]}"

# By Enterprise Tx
walletFile="$TEST_RAFT_INSTANCE/genesis_wallet.txt"
Expand Down
6 changes: 3 additions & 3 deletions consensus/impl/raftv2/test/config/_genesis.json
Expand Up @@ -15,17 +15,17 @@
"enterprise_bps": [
{
"name": "aergo1",
"address": "aergop2p://127.0.0.1:11001",
"address": "/ip4/127.0.0.1/tcp/11001",
"peerid": "16Uiu2HAkvaAMCHkd9hZ6hQkdDLKoXP4eLJSqkMF1YqkSNy5v9SVn"
},
{
"name": "aergo2",
"address": "aergop2p://127.0.0.1:11002",
"address": "/ip4/127.0.0.1/tcp/11002",
"peerid": "16Uiu2HAmJqEp9f9WAbzFxkLrnHnW4EuUDM69xkCDPF26HmNCsib6"
},
{
"name": "aergo3",
"address": "aergop2p://127.0.0.1:11003",
"address": "/ip4/127.0.0.1/tcp/11003",
"peerid": "16Uiu2HAmA2ysmFxoQ37sk1Zk2sMrPysqTmwYAFrACyf3LtP3gxpJ"
}
]
Expand Down
2 changes: 1 addition & 1 deletion consensus/impl/raftv2/test/test_common.sh
Expand Up @@ -533,7 +533,7 @@ function makeAddMemberJson() {
exit 100
fi

memberJson='[ { "command": "add", "name": "'$_nodename'", "address": "aergop2p://127.0.0.1:'${httpports[$_nodename]}'", "peerid":"'${peerids[$_nodename]}'" } ]'
memberJson='[ { "command": "add", "name": "'$_nodename'", "address": "/ip4/127.0.0.1/tcp/'${httpports[$_nodename]}'", "peerid":"'${peerids[$_nodename]}'" } ]'

echo $memberJson
}
Expand Down
25 changes: 2 additions & 23 deletions consensus/raftCommon.go
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/aergoio/etcd/raft"
"github.com/aergoio/etcd/raft/raftpb"
"io"
"net"
"net/url"
)

type EntryType int8
Expand Down Expand Up @@ -315,8 +313,8 @@ func (m *Member) IsValid() bool {
return false
}

if _, err := ParseToUrl(m.Address); err != nil {
logger.Error().Err(err).Msg("parse url of member")
if _, err := types.ParseMultiaddrWithResolve(m.Address); err != nil {
logger.Error().Err(err).Msg("parse address of member")
return false
}

Expand Down Expand Up @@ -369,25 +367,6 @@ func (mbrs MembersByName) Swap(i, j int) {
mbrs[i], mbrs[j] = mbrs[j], mbrs[i]
}

func ParseToUrl(urlstr string) (*url.URL, error) {
var urlObj *url.URL
var err error

if urlObj, err = url.Parse(urlstr); err != nil {
return nil, err
}

if urlObj.Scheme != "aergop2p" {
return nil, ErrURLInvalidScheme
}

if _, _, err := net.SplitHostPort(urlObj.Host); err != nil {
return nil, ErrURLInvalidPort
}

return urlObj, nil
}

// DummyRaftAccessor returns error if process request comes, or silently ignore raft message.
type DummyRaftAccessor struct {
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/p2putil/address.go → internal/network/address.go
Expand Up @@ -3,7 +3,7 @@
* @copyright defined in aergo/LICENSE.txt
*/

package p2putil
package network

import (
"fmt"
Expand Down
Expand Up @@ -3,7 +3,7 @@
* @copyright defined in aergo/LICENSE.txt
*/

package p2putil
package network

import (
"net"
Expand Down
6 changes: 3 additions & 3 deletions p2p/p2p.go
Expand Up @@ -13,10 +13,10 @@ import (
"github.com/aergoio/aergo/p2p/p2pkey"
"github.com/aergoio/aergo/p2p/raftsupport"
"github.com/aergoio/aergo/p2p/transport"
"github.com/libp2p/go-libp2p-core/network"
"github.com/rs/zerolog"

"github.com/aergoio/aergo/consensus"
"github.com/aergoio/aergo/internal/network"
"github.com/aergoio/aergo/p2p/metric"
"github.com/aergoio/aergo/p2p/p2pcommon"
"github.com/aergoio/aergo/p2p/p2putil"
Expand Down Expand Up @@ -275,7 +275,7 @@ func (p2ps *P2P) checkAndAddPeerAddresses(peers []*types.PeerAddress) {
if selfPeerID == rPeerID {
continue
}
if p2putil.CheckAddressType(rPeerAddr.Address) == p2putil.AddressTypeError {
if network.CheckAddressType(rPeerAddr.Address) == network.AddressTypeError {
continue
}
meta := p2pcommon.FromPeerAddress(rPeerAddr)
Expand Down Expand Up @@ -383,7 +383,7 @@ func (p2ps *P2P) CreateHSHandler(legacy bool, outbound bool, pid types.PeerID) p
}
}

func (p2ps *P2P) CreateRemotePeer(meta p2pcommon.PeerMeta, seq uint32, status *types.Status, stream network.Stream, rw p2pcommon.MsgReadWriter) p2pcommon.RemotePeer {
func (p2ps *P2P) CreateRemotePeer(meta p2pcommon.PeerMeta, seq uint32, status *types.Status, stream types.Stream, rw p2pcommon.MsgReadWriter) p2pcommon.RemotePeer {
newPeer := newRemotePeer(meta, seq, p2ps.pm, p2ps, p2ps.Logger, p2ps.mf, p2ps.signer, rw)
newPeer.UpdateBlkCache(status.GetBestBlockHash(), status.GetBestHeight())
rw.AddIOListener(p2ps.mm.NewMetric(newPeer.ID(), newPeer.ManageNumber()))
Expand Down
79 changes: 15 additions & 64 deletions p2p/p2putil/libp2putil.go
Expand Up @@ -8,45 +8,26 @@ package p2putil
import (
"bytes"
"fmt"
"github.com/aergoio/aergo/internal/network"
"github.com/aergoio/aergo/p2p/p2pcommon"
"github.com/aergoio/aergo/types"
"github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/multiformats/go-multiaddr"
"io/ioutil"
"net"
"os"
"path/filepath"
"strconv"
"strings"
)

var InvalidArgument = fmt.Errorf("invalid argument")

// ToMultiAddr make libp2p compatible Multiaddr object
func ToMultiAddr(ipAddr net.IP, port uint32) (multiaddr.Multiaddr, error) {
var addrString string
if ipAddr.To4() != nil {
addrString = fmt.Sprintf("/ip4/%s/tcp/%d", ipAddr.String(), port)
} else if ipAddr.To16() != nil {
addrString = fmt.Sprintf("/ip6/%s/tcp/%d", ipAddr.String(), port)
} else {
return nil, InvalidArgument
}
peerAddr, err := multiaddr.NewMultiaddr(addrString)
if err != nil {
return nil, err
}
return peerAddr, nil
}

// PeerMetaToMultiAddr make libp2p compatible Multiaddr object from peermeta
func PeerMetaToMultiAddr(m p2pcommon.PeerMeta) (multiaddr.Multiaddr, error) {
ipAddr, err := GetSingleIPAddress(m.IPAddress)
ipAddr, err := network.GetSingleIPAddress(m.IPAddress)
if err != nil {
return nil, err
}
return ToMultiAddr(ipAddr, m.Port)
return types.ToMultiAddr(ipAddr, m.Port)
}

func FromMultiAddr(targetAddr multiaddr.Multiaddr) (p2pcommon.PeerMeta, error) {
Expand Down Expand Up @@ -78,56 +59,26 @@ func FromMultiAddr(targetAddr multiaddr.Multiaddr) (p2pcommon.PeerMeta, error) {
return meta, nil
}

func ParseMultiAddrString(str string) (p2pcommon.PeerMeta, error) {
ma, err := ParseMultiaddrWithResolve(str)
func FromMultiAddrString(str string) (p2pcommon.PeerMeta, error) {
ma, err := types.ParseMultiaddrWithResolve(str)
if err != nil {
return p2pcommon.PeerMeta{}, err
}
return FromMultiAddr(ma)
}

// ParseMultiaddrWithResolve parse string to multiaddr, additionally accept domain name with protocol /dns
// NOTE: this function is temporarily use until go-multiaddr start to support dns.
func ParseMultiaddrWithResolve(str string) (multiaddr.Multiaddr, error) {
ma, err := multiaddr.NewMultiaddr(str)

func FromMultiAddrStringWithPID(str string, id types.PeerID) (p2pcommon.PeerMeta, error) {
addr1, err := types.ParseMultiaddrWithResolve(str)
if err != nil {
// multiaddr is not support domain name yet. change domain name to ip address manually
split := strings.Split(str, "/")
if len(split) < 3 || !strings.HasPrefix(split[1], "dns") {
return nil, err
}
domainName := split[2]
ips, err := ResolveHostDomain(domainName)
if err != nil {
return nil, fmt.Errorf("Could not get IPs: %v\n", err)
}
// use ipv4 as possible.
ipSelected := false
for _, ip := range ips {
if ip.To4() != nil {
split[1] = "ip4"
split[2] = ip.To4().String()
ipSelected = true
break
}
}
if !ipSelected {
for _, ip := range ips {
if ip.To16() != nil {
split[1] = "ip6"
split[2] = ip.To16().String()
ipSelected = true
break
}
}
}
if !ipSelected {
return nil, err
}
rejoin := strings.Join(split, "/")
return multiaddr.NewMultiaddr(rejoin)
return p2pcommon.PeerMeta{}, err
}
return ma, nil
pidAddr, err := multiaddr.NewComponent(multiaddr.ProtocolWithCode(multiaddr.P_P2P).Name, id.Pretty())
if err != nil {
return p2pcommon.PeerMeta{}, err
}
ma := multiaddr.Join(addr1, pidAddr)
return FromMultiAddr(ma)
}

func LoadKeyFile(keyFile string) (crypto.PrivKey, crypto.PubKey, error) {
Expand Down

0 comments on commit 1136bc9

Please sign in to comment.