Skip to content

Commit

Permalink
p2p: remove support for legacy stream delimited protocols (#2495)
Browse files Browse the repository at this point in the history
Removes support for legacy stream delimited libp2p protocols. Only support length-delimited protocols.
Note: this PR is clone of #2350 

category: refactor
ticket: #1934
  • Loading branch information
dB2510 committed Aug 1, 2023
1 parent 6f083a7 commit 7d3fb56
Show file tree
Hide file tree
Showing 12 changed files with 30 additions and 215 deletions.
4 changes: 2 additions & 2 deletions app/peerinfo/adhoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func DoOnce(ctx context.Context, tcpNode host.Host, peerID peer.ID) (*pbv1.PeerI

req := new(pbv1.PeerInfo) // TODO(corver): Populate request fields and make them required.
resp := new(pbv1.PeerInfo)
err := p2p.SendReceive(ctx, tcpNode, peerID, req, resp, protocolID1,
p2p.WithSendReceiveRTT(rttCallback), p2p.WithDelimitedProtocol(protocolID2))
err := p2p.SendReceive(ctx, tcpNode, peerID, req, resp, protocolID2,
p2p.WithSendReceiveRTT(rttCallback))
if err != nil {
return nil, 0, false, err
}
Expand Down
12 changes: 4 additions & 8 deletions app/peerinfo/peerinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ import (
)

const (
period = time.Minute

protocolID1 protocol.ID = "/charon/peerinfo/1.0.0"
period = time.Minute
protocolID2 protocol.ID = "/charon/peerinfo/2.0.0"
)

// Protocols returns the supported protocols of this package in order of precedence.
func Protocols() []protocol.ID {
return []protocol.ID{protocolID2, protocolID1}
return []protocol.ID{protocolID2}
}

type (
Expand Down Expand Up @@ -81,7 +79,7 @@ func newInternal(tcpNode host.Host, peers []peer.ID, version version.SemVer, loc
startTime := timestamppb.New(nowFunc())

// Register a simple handler that returns our info and ignores the request.
registerHandler("peerinfo", tcpNode, protocolID1,
registerHandler("peerinfo", tcpNode, protocolID2,
func() proto.Message { return new(pbv1.PeerInfo) },
func(context.Context, peer.ID, proto.Message) (proto.Message, bool, error) {
return &pbv1.PeerInfo{
Expand All @@ -92,7 +90,6 @@ func newInternal(tcpNode host.Host, peers []peer.ID, version version.SemVer, loc
StartedAt: startTime,
}, true, nil
},
p2p.WithDelimitedProtocol(protocolID2),
)

// Create log filters
Expand Down Expand Up @@ -172,8 +169,7 @@ func (p *PeerInfo) sendOnce(ctx context.Context, now time.Time) {
}

resp := new(pbv1.PeerInfo)
err := p.sendFunc(ctx, p.tcpNode, peerID, req, resp, protocolID1,
p2p.WithSendReceiveRTT(rttCallback), p2p.WithDelimitedProtocol(protocolID2))
err := p.sendFunc(ctx, p.tcpNode, peerID, req, resp, protocolID2, p2p.WithSendReceiveRTT(rttCallback))
if err != nil {
return // Logging handled by send func.
} else if resp.SentAt == nil || resp.StartedAt == nil {
Expand Down
7 changes: 3 additions & 4 deletions core/consensus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ import (

const (
recvBuffer = 100 // Allow buffering some initial messages when this node is late to start an instance.
protocolID1 = "/charon/consensus/qbft/1.0.0"
protocolID2 = "/charon/consensus/qbft/2.0.0"
)

// Protocols returns the supported protocols of this package in order of precedence.
func Protocols() []protocol.ID {
return []protocol.ID{protocolID2, protocolID1}
return []protocol.ID{protocolID2}
}

type subscriber func(ctx context.Context, duty core.Duty, value proto.Message) error
Expand Down Expand Up @@ -287,9 +286,9 @@ func (c *Component) SubscribePriority(fn func(ctx context.Context, duty core.Dut

// Start registers the libp2p receive handler and starts a goroutine that cleans state. This should only be called once.
func (c *Component) Start(ctx context.Context) {
p2p.RegisterHandler("qbft", c.tcpNode, protocolID1,
p2p.RegisterHandler("qbft", c.tcpNode, protocolID2,
func() proto.Message { return new(pbv1.ConsensusMsg) },
c.handle, p2p.WithDelimitedProtocol(protocolID2))
c.handle)

go func() {
for {
Expand Down
4 changes: 1 addition & 3 deletions core/consensus/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/obolnetwork/charon/core"
pbv1 "github.com/obolnetwork/charon/core/corepb/v1"
"github.com/obolnetwork/charon/core/qbft"
"github.com/obolnetwork/charon/p2p"
)

// transport encapsulates receiving and broadcasting for a consensus instance/duty.
Expand Down Expand Up @@ -129,8 +128,7 @@ func (t *transport) Broadcast(ctx context.Context, typ qbft.MsgType, duty core.D
continue
}

err = t.component.sender.SendAsync(ctx, t.component.tcpNode, protocolID1, p.ID, msg.ToConsensusMsg(),
p2p.WithDelimitedProtocol(protocolID2))
err = t.component.sender.SendAsync(ctx, t.component.tcpNode, protocolID2, p.ID, msg.ToConsensusMsg())
if err != nil {
return err
}
Expand Down
11 changes: 4 additions & 7 deletions core/parsigex/parsigex.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ import (
"github.com/obolnetwork/charon/tbls"
)

const (
protocolID1 = "/charon/parsigex/1.0.0"
protocolID2 = "/charon/parsigex/2.0.0"
)
const protocolID2 = "/charon/parsigex/2.0.0"

// Protocols returns the supported protocols of this package in order of precedence.
func Protocols() []protocol.ID {
return []protocol.ID{protocolID2, protocolID1}
return []protocol.ID{protocolID2}
}

func NewParSigEx(tcpNode host.Host, sendFunc p2p.SendFunc, peerIdx int, peers []peer.ID, verifyFunc func(context.Context, core.Duty, core.PubKey, core.ParSignedData) error) *ParSigEx {
Expand All @@ -40,7 +37,7 @@ func NewParSigEx(tcpNode host.Host, sendFunc p2p.SendFunc, peerIdx int, peers []
}

newReq := func() proto.Message { return new(pbv1.ParSigExMsg) }
p2p.RegisterHandler("parsigex", tcpNode, protocolID1, newReq, parSigEx.handle, p2p.WithDelimitedProtocol(protocolID2))
p2p.RegisterHandler("parsigex", tcpNode, protocolID2, newReq, parSigEx.handle)

return parSigEx
}
Expand Down Expand Up @@ -115,7 +112,7 @@ func (m *ParSigEx) Broadcast(ctx context.Context, duty core.Duty, set core.ParSi
continue
}

if err := m.sendFunc(ctx, m.tcpNode, protocolID1, p, &msg, p2p.WithDelimitedProtocol(protocolID2)); err != nil {
if err := m.sendFunc(ctx, m.tcpNode, protocolID2, p, &msg); err != nil {
return err
}
}
Expand Down
14 changes: 5 additions & 9 deletions core/priority/prioritiser.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,11 @@ import (
"github.com/obolnetwork/charon/p2p"
)

const (
protocolID1 = "charon/priority/1.1.0"
protocolID2 = "charon/priority/2.0.0"
)
const protocolID2 = "charon/priority/2.0.0"

// Protocols returns the supported protocols of this package in order of precedence.
func Protocols() []protocol.ID {
return []protocol.ID{protocolID2, protocolID1}
return []protocol.ID{protocolID2}
}

// Topic groups priorities in an instance.
Expand Down Expand Up @@ -118,7 +115,7 @@ func newInternal(tcpNode host.Host, peers []peer.ID, minRequired int,
})

// Register prioritiser protocol handler.
registerHandlerFunc("priority", tcpNode, protocolID1,
registerHandlerFunc("priority", tcpNode, protocolID2,
func() proto.Message { return new(pbv1.PriorityMsg) },
func(ctx context.Context, pID peer.ID, msg proto.Message) (proto.Message, bool, error) {
prioMsg, ok := msg.(*pbv1.PriorityMsg)
Expand All @@ -133,8 +130,7 @@ func newInternal(tcpNode host.Host, peers []peer.ID, minRequired int,
}

return resp, true, nil
},
p2p.WithDelimitedProtocol(protocolID2))
})

return p
}
Expand Down Expand Up @@ -337,7 +333,7 @@ func exchange(ctx context.Context, tcpNode host.Host, peers []peer.ID, msgValida

go func(pID peer.ID) {
response := new(pbv1.PriorityMsg)
err := sendFunc(ctx, tcpNode, pID, own, response, protocolID1, p2p.WithDelimitedProtocol(protocolID2))
err := sendFunc(ctx, tcpNode, pID, own, response, protocolID2)
if err != nil {
// No need to log, since transport will do it.
return
Expand Down
5 changes: 2 additions & 3 deletions dkg/bcast/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c *client) Broadcast(ctx context.Context, msgID string, msg proto.Message)

fork, join, cancel := forkjoin.New(ctx, func(ctx context.Context, pID peer.ID) (*pb.BCastSigResponse, error) {
sigResp := new(pb.BCastSigResponse)
err := c.sendRecvFunc(ctx, c.tcpNode, pID, sigReq, sigResp, protocolIDSig, p2p.WithDelimitedProtocol(protocolIDSig))
err := c.sendRecvFunc(ctx, c.tcpNode, pID, sigReq, sigResp, protocolIDSig)

return sigResp, err
})
Expand Down Expand Up @@ -133,8 +133,7 @@ func (c *client) Broadcast(ctx context.Context, msgID string, msg proto.Message)
continue // Skip self.
}

err := c.sendFunc(ctx, c.tcpNode, protocolIDMsg, pID, bcastMsg,
p2p.WithDelimitedProtocol(protocolIDMsg), p2p.WithSendTimeout(sendTimeout))
err := c.sendFunc(ctx, c.tcpNode, protocolIDMsg, pID, bcastMsg, p2p.WithSendTimeout(sendTimeout))
if err != nil {
return errors.Wrap(err, "send message")
}
Expand Down
2 changes: 0 additions & 2 deletions dkg/bcast/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@ func newServer(tcpNode host.Host, signFunc signFunc, verifyFunc verifyFunc) *ser
p2p.RegisterHandler("bcast", tcpNode, protocolIDSig,
func() proto.Message { return new(pb.BCastSigRequest) },
s.handleSigRequest,
p2p.WithDelimitedProtocol(protocolIDSig),
p2p.WithReceiveTimeout(receiveTimeout),
)

p2p.RegisterHandler("bcast", tcpNode, protocolIDMsg,
func() proto.Message { return new(pb.BCastMessage) },
s.handleMessage,
p2p.WithDelimitedProtocol(protocolIDMsg),
p2p.WithReceiveTimeout(receiveTimeout),
)

Expand Down
3 changes: 1 addition & 2 deletions dkg/frostp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func newFrostP2P(tcpNode host.Host, peers map[peer.ID]cluster.NodeIdx, bcastComp
p2p.RegisterHandler("frost", tcpNode, round1P2PID,
func() proto.Message { return new(pb.FrostRound1P2P) },
newP2PCallback(tcpNode, peers, round1P2PRecv, numVals),
p2p.WithDelimitedProtocol(round1P2PID),
)

bcastCallback := newBcastCallback(peers, round1CastsRecv, round2CastsRecv, threshold, numVals)
Expand Down Expand Up @@ -238,7 +237,7 @@ func (f *frostP2P) Round1(ctx context.Context, castR1 map[msgKey]frost.Round1Bca
return nil, nil, errors.New("bug: unexpected p2p message to self")
}

err := p2p.Send(ctx, f.tcpNode, round1P2PID, pID, p2pMsg, p2p.WithDelimitedProtocol(round1P2PID))
err := p2p.Send(ctx, f.tcpNode, round1P2PID, pID, p2pMsg)
if err != nil {
return nil, nil, err
}
Expand Down
Loading

0 comments on commit 7d3fb56

Please sign in to comment.