/
request_buffer.go
145 lines (129 loc) · 4.67 KB
/
request_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package rootchain
import (
"crypto/sha256"
"errors"
"sync"
"github.com/alphabill-org/alphabill/network/protocol/certification"
"github.com/alphabill-org/alphabill/rootchain/partitions"
"github.com/alphabill-org/alphabill/types"
)
type (
QuorumStatus uint8
CertRequestBuffer struct {
mu sync.RWMutex
store map[types.SystemID]*requestBuffer
}
// requestBuffer keeps track of received certification nodeRequest and counts state hashes.
sha256Hash [sha256.Size]byte
requestBuffer struct {
nodeRequest map[string]sha256Hash // node request register - one request per node id per round (key is node identifier)
requests map[sha256Hash][]*certification.BlockCertificationRequest // unique requests, IR record hash to certification request
}
)
const (
QuorumInProgress QuorumStatus = iota
QuorumAchieved
QuorumNotPossible
)
// NewCertificationRequestBuffer create new certification nodeRequest buffer
func NewCertificationRequestBuffer() *CertRequestBuffer {
return &CertRequestBuffer{
store: make(map[types.SystemID]*requestBuffer),
}
}
// Add request to certification store. Per node id first valid request is stored. Rest are either duplicate or
// equivocating and in both cases error is returned. Clear or Reset in order to receive new nodeRequest
func (c *CertRequestBuffer) Add(id types.SystemID, request *certification.BlockCertificationRequest, tb partitions.PartitionTrustBase) (QuorumStatus, []*certification.BlockCertificationRequest, error) {
c.mu.Lock()
defer c.mu.Unlock()
rs := c.get(id)
return rs.add(request, tb)
}
// IsConsensusReceived has partition with id reached consensus
func (c *CertRequestBuffer) IsConsensusReceived(id types.SystemID, tb partitions.PartitionTrustBase) QuorumStatus {
c.mu.RLock()
defer c.mu.RUnlock()
rs := c.get(id)
_, res := rs.isConsensusReceived(tb)
return res
}
// Reset removed all incoming nodeRequest from all stores
func (c *CertRequestBuffer) Reset() {
c.mu.Lock()
defer c.mu.Unlock()
for _, rs := range c.store {
rs.reset()
}
}
// Clear clears node request in one partition
func (c *CertRequestBuffer) Clear(id types.SystemID) {
c.mu.Lock()
defer c.mu.Unlock()
rs := c.get(id)
rs.reset()
}
// get returns an existing store for system identifier or registers and returns a new one if none existed
func (c *CertRequestBuffer) get(id types.SystemID) *requestBuffer {
rs, f := c.store[id]
if !f {
rs = newRequestStore()
c.store[id] = rs
}
return rs
}
// newRequestStore creates a new empty requestBuffer.
func newRequestStore() *requestBuffer {
s := &requestBuffer{
nodeRequest: make(map[string]sha256Hash), // request per node is allowed per partition round
requests: make(map[sha256Hash][]*certification.BlockCertificationRequest), // count matching requests
}
return s
}
// add stores a new input record received from the node.
func (rs *requestBuffer) add(req *certification.BlockCertificationRequest, tb partitions.PartitionTrustBase) (QuorumStatus, []*certification.BlockCertificationRequest, error) {
_, f := rs.nodeRequest[req.NodeIdentifier]
if f {
proof, res := rs.isConsensusReceived(tb)
return res, proof, errors.New("request in this round already stored, rejected")
}
// first request, calculate hash of IR and store
hash := sha256.Sum256(req.InputRecord.Bytes())
// no request in buffered yet, add
rs.nodeRequest[req.NodeIdentifier] = hash
rs.requests[hash] = append(rs.requests[hash], req)
proof, res := rs.isConsensusReceived(tb)
return res, proof, nil
}
func (rs *requestBuffer) reset() {
rs.nodeRequest = make(map[string]sha256Hash)
rs.requests = make(map[sha256Hash][]*certification.BlockCertificationRequest)
}
func (rs *requestBuffer) isConsensusReceived(tb partitions.PartitionTrustBase) ([]*certification.BlockCertificationRequest, QuorumStatus) {
var (
reqCount = 0
bcReqs []*certification.BlockCertificationRequest = nil
)
// find IR hash that has most matching requests
for _, reqs := range rs.requests {
if reqCount < len(reqs) {
reqCount = len(reqs)
bcReqs = reqs
}
}
quorum := int(tb.GetQuorum())
if reqCount >= quorum {
// consensus received
return bcReqs, QuorumAchieved
}
// enough nodes have voted and even if the rest of the votes are for the most popular, quorum is still not possible
if int(tb.GetTotalNodes())-len(rs.nodeRequest)+reqCount < quorum {
// consensus not possible
allReq := make([]*certification.BlockCertificationRequest, 0, len(rs.nodeRequest))
for _, req := range rs.requests {
allReq = append(allReq, req...)
}
return allReq, QuorumNotPossible
}
// consensus possible in the future
return nil, QuorumInProgress
}