This repository has been archived by the owner on Sep 19, 2023. It is now read-only.
/
outstanding.go
137 lines (107 loc) · 3.76 KB
/
outstanding.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package statemachine
import (
"fmt"
"github.com/IBM/mirbft/pkg/pb/msgs"
)
func newOutstandingReqs(clientTracker *clientTracker, networkState *msgs.NetworkState, logger Logger) *allOutstandingReqs {
clientTracker.availableList.resetIterator()
ao := &allOutstandingReqs{
buckets: map[bucketID]*bucketOutstandingReqs{},
correctRequests: map[string]*msgs.RequestAck{},
outstandingRequests: map[string]*sequence{},
availableIterator: clientTracker.availableList,
}
numBuckets := int(networkState.Config.NumberOfBuckets)
for i := bucketID(0); i < bucketID(numBuckets); i++ {
bo := &bucketOutstandingReqs{
clients: map[uint64]*clientOutstandingReqs{},
}
ao.buckets[i] = bo
for _, client := range networkState.Clients {
var firstUncommitted uint64
for j := 0; j < numBuckets; j++ {
reqNo := client.LowWatermark + uint64(j)
if clientReqToBucket(client.Id, reqNo, networkState.Config) == i {
firstUncommitted = reqNo
break
}
}
cors := &clientOutstandingReqs{
nextReqNo: firstUncommitted,
numBuckets: uint64(networkState.Config.NumberOfBuckets),
client: client,
}
cors.skipPreviouslyCommitted()
logger.Log(LevelDebug, "initializing outstanding reqs for client", "client_id", client.Id, "bucket_id", i, "low_watermark", client.LowWatermark, "next_req_no", cors.nextReqNo)
bo.clients[client.Id] = cors
}
}
ao.advanceRequests() // Note, this can return no actions as no sequences have allocated
return ao
}
type allOutstandingReqs struct {
buckets map[bucketID]*bucketOutstandingReqs
availableIterator *availableList
correctRequests map[string]*msgs.RequestAck // TODO, map by struct with digest + reqNo + clientNo, otherwise clients can engineer collisions.
outstandingRequests map[string]*sequence
}
type bucketOutstandingReqs struct {
clients map[uint64]*clientOutstandingReqs // TODO, obvious optimization is to make this active clients and initialize this lazily
}
type clientOutstandingReqs struct {
nextReqNo uint64
numBuckets uint64
client *msgs.NetworkState_Client
}
func (cors *clientOutstandingReqs) skipPreviouslyCommitted() {
for {
if !isCommitted(cors.nextReqNo, cors.client) {
break
}
cors.nextReqNo += cors.numBuckets
}
}
func (ao *allOutstandingReqs) advanceRequests() *ActionList {
actions := &ActionList{}
for ao.availableIterator.hasNext() {
ack := ao.availableIterator.next()
key := string(ack.Digest)
if seq, ok := ao.outstandingRequests[key]; ok {
delete(ao.outstandingRequests, key)
actions.concat(seq.satisfyOutstanding(ack))
continue
}
ao.correctRequests[key] = ack
}
return actions
}
// TODO, bucket probably can/should be stored in the *sequence
func (ao *allOutstandingReqs) applyAcks(bucket bucketID, seq *sequence, batch []*msgs.RequestAck) (*ActionList, error) {
bo, ok := ao.buckets[bucket]
assertTruef(ok, "told to apply acks for bucket %d which does not exist", bucket)
outstandingReqs := map[string]struct{}{}
for _, req := range batch {
co, ok := bo.clients[req.ClientId]
if !ok {
return nil, fmt.Errorf("no such client")
}
if co.nextReqNo != req.ReqNo {
return nil, fmt.Errorf("expected ClientId=%d next request for Bucket=%d to have ReqNo=%d but got ReqNo=%d", req.ClientId, bucket, co.nextReqNo, req.ReqNo)
}
// TODO, return an error if the request proposed is for a seqno before this request is valid
key := string(req.Digest)
if _, ok := ao.correctRequests[key]; ok {
delete(ao.correctRequests, key)
} else {
ao.outstandingRequests[key] = seq
outstandingReqs[key] = struct{}{}
}
co.nextReqNo += co.numBuckets
co.skipPreviouslyCommitted()
}
return seq.allocate(batch, outstandingReqs), nil
}