-
Notifications
You must be signed in to change notification settings - Fork 0
/
ack.go
73 lines (64 loc) · 1.65 KB
/
ack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package comm
import (
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
)
type sendFunc func(peer *RemotePeer, msg *proto.SignedGossipMessage)
type waitFunc func(*RemotePeer) error
type ackSendOperation struct {
snd sendFunc
waitForAck waitFunc
}
func newAckSendOperation(snd sendFunc, waitForAck waitFunc) *ackSendOperation {
return &ackSendOperation{
snd: snd,
waitForAck: waitForAck,
}
}
func (aso *ackSendOperation) send(msg *proto.SignedGossipMessage, minAckNum int, peers ...*RemotePeer) []SendResult {
successAcks := 0
results := []SendResult{}
acks := make(chan SendResult, len(peers))
// Send to all peers the message
for _, p := range peers {
go func(p *RemotePeer) {
// Send the message to 'p'
aso.snd(p, msg)
// Wait for an ack from 'p', or get an error if timed out
err := aso.waitForAck(p)
acks <- SendResult{
RemotePeer: *p,
error: err,
}
}(p)
}
for {
ack := <-acks
results = append(results, SendResult{
error: ack.error,
RemotePeer: ack.RemotePeer,
})
if ack.error == nil {
successAcks++
}
if successAcks == minAckNum || len(results) == len(peers) {
break
}
}
return results
}
func interceptAcks(nextHandler handler, remotePeerID common.PKIidType, pubSub *util.PubSub) func(*proto.SignedGossipMessage) {
return func(m *proto.SignedGossipMessage) {
if m.IsAck() {
topic := topicForAck(m.Nonce, remotePeerID)
pubSub.Publish(topic, m.GetAck())
return
}
nextHandler(m)
}
}