Skip to content

Commit

Permalink
send messages to all connections of a peer
Browse files Browse the repository at this point in the history
  • Loading branch information
cedricfung committed Jul 8, 2024
1 parent 6af422d commit e9eb373
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 30 deletions.
30 changes: 12 additions & 18 deletions p2p/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func parseNetworkMessage(version uint8, data []byte) (*PeerMessage, error) {
}
msg.Snapshot = snap.Snapshot
case PeerMessageTypeRelay:
msg.Data = data[1:]
msg.Data = data
case PeerMessageTypeConsumers:
msg.Data = data[1:]
}
Expand All @@ -431,14 +431,14 @@ func parseNetworkMessage(version uint8, data []byte) (*PeerMessage, error) {

func (me *Peer) relayOrHandlePeerMessage(relayerId crypto.Hash, msg *PeerMessage) error {
logger.Verbosef("me.relayOrHandlePeerMessage(%s, %s) => %s %v", me.Address, me.IdForNetwork, relayerId, msg.Data)
if len(msg.Data) < 64 {
if len(msg.Data) < 65 {
return nil
}
var from, to crypto.Hash
copy(from[:], msg.Data[:32])
copy(to[:], msg.Data[32:64])
copy(from[:], msg.Data[1:33])
copy(to[:], msg.Data[33:65])
if to == me.IdForNetwork {
rm, err := parseNetworkMessage(msg.version, msg.Data[64:])
rm, err := parseNetworkMessage(msg.version, msg.Data[65:])
logger.Verbosef("me.relayOrHandlePeerMessage.ME(%s, %s) => %s %v %v", me.Address, me.IdForNetwork, from, rm, err)
if err != nil {
return err
Expand All @@ -450,21 +450,19 @@ func (me *Peer) relayOrHandlePeerMessage(relayerId crypto.Hash, msg *PeerMessage
}

var relayers []*Peer
peer := me.GetNeighbor(to)
if peer != nil {
relayers = []*Peer{peer}
if nbrs := me.GetNeighbors(to); len(nbrs) > 0 {
relayers = nbrs
} else {
relayers = me.GetRemoteRelayers(to)
}
data := append([]byte{PeerMessageTypeRelay}, msg.Data...)
rk := crypto.Blake3Hash(data)
rk := crypto.Blake3Hash(msg.Data)
rk = crypto.Blake3Hash(append(rk[:], []byte("REMOTE")...))
for _, peer := range relayers {
if peer.IdForNetwork == relayerId {
return nil
}
rk := crypto.Blake3Hash(append(rk[:], peer.IdForNetwork[:]...))
success := me.offerToPeerWithCacheCheck(peer, MsgPriorityNormal, &ChanMsg{rk[:], data})
success := me.offerToPeerWithCacheCheck(peer, MsgPriorityNormal, &ChanMsg{rk[:], msg.Data})
if !success {
logger.Verbosef("me.offerToPeerWithCacheCheck(%s) relayer timeout\n", peer.IdForNetwork)
}
Expand All @@ -477,10 +475,6 @@ func (me *Peer) updateRemoteRelayerConsumers(relayerId crypto.Hash, data []byte)
if !me.IsRelayer() {
return nil
}
relayer := me.GetNeighbor(relayerId)
if relayer == nil || !relayer.IsRelayer() {
return nil
}
pl := len(crypto.Key{}) + 137
for c := len(data) / pl; c > 0; c-- {
var id crypto.Hash
Expand All @@ -492,7 +486,7 @@ func (me *Peer) updateRemoteRelayerConsumers(relayerId crypto.Hash, data []byte)
if token.PeerId != id {
panic(id)
}
me.remoteRelayers.Add(id, relayer.IdForNetwork)
me.remoteRelayers.Add(id, relayerId)
data = data[pl:]
}
return nil
Expand All @@ -514,8 +508,8 @@ func (me *Peer) handlePeerMessage(peerId crypto.Hash, msg *PeerMessage) error {
if err != nil {
return err
}
peer := me.GetNeighbor(peerId)
if peer != nil {
nbrs := me.GetNeighbors(peerId)
for _, peer := range nbrs {
peer.syncRing.Offer(msg.Graph)
}
return nil
Expand Down
29 changes: 17 additions & 12 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,13 @@ func (me *Peer) sendToPeer(to crypto.Hash, typ byte, key, data []byte, priority
}
me.sentMetric.handle(typ)

peer := me.GetNeighbor(to)
if peer != nil {
success := peer.offer(priority, &ChanMsg{key, data})
if !success {
return fmt.Errorf("peer send %d timeout", priority)
nbrs := me.GetNeighbors(to)
if len(nbrs) > 0 {
for _, peer := range nbrs {
success := peer.offer(priority, &ChanMsg{key, data})
if !success {
logger.Verbosef("peer.offer(%s) send timeout\n", peer.IdForNetwork)
}
}
return nil
}
Expand Down Expand Up @@ -463,12 +465,17 @@ func (me *Peer) sendSnapshotMessageToPeer(to crypto.Hash, snap crypto.Hash, typ
return me.sendToPeer(to, typ, key, data, MsgPriorityNormal)
}

func (me *Peer) GetNeighbor(key crypto.Hash) *Peer {
func (me *Peer) GetNeighbors(key crypto.Hash) []*Peer {
var nbrs []*Peer
p := me.relayers.Get(key)
if p != nil {
return p
nbrs = append(nbrs, p)
}
return me.consumers.Get(key)
p = me.consumers.Get(key)
if p != nil {
nbrs = append(nbrs, p)
}
return nbrs
}

func (me *Peer) GetRemoteRelayers(key crypto.Hash) []*Peer {
Expand All @@ -478,10 +485,8 @@ func (me *Peer) GetRemoteRelayers(key crypto.Hash) []*Peer {
var relayers []*Peer
ids := me.remoteRelayers.Get(key)
for _, id := range ids {
p := me.GetNeighbor(id)
if p != nil {
relayers = append(relayers, p)
}
nbrs := me.GetNeighbors(id)
relayers = append(relayers, nbrs...)
}
return relayers
}

0 comments on commit e9eb373

Please sign in to comment.