forked from otrack/epaxos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Queue.go
212 lines (180 loc) · 6.82 KB
/
Queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
package twophase
import (
"epaxos/batching"
"epaxos/dlog"
"errors"
"fmt"
)
type QueueingError struct {
error
code int32
}
type Requeueing interface {
Requeue(bat batching.ProposalBatch) error
}
type Queueing interface {
Requeueing
GetHead() <-chan batching.ProposalBatch
GetTail() chan<- batching.ProposalBatch
Dequeued(bat batching.ProposalBatch, onSuccess func()) error // func that should be called once a value is pulled from the queue in a select statement. func is what happens afterwards
}
type UniqueQ struct {
requeued map[int32]struct{}
q chan batching.ProposalBatch
id int32
}
type ProposedObserver interface {
ObserveProposed(proposed batching.ProposalBatch)
}
type ProposeBatchOracle interface {
ShouldPropose(bat batching.ProposalBatch) bool
}
func (q *UniqueQ) ShouldPropose(bat batching.ProposalBatch) bool {
return true
}
func (q *UniqueQ) GetTail() chan<- batching.ProposalBatch {
return q.q
}
func (q *UniqueQ) GetHead() <-chan batching.ProposalBatch {
return q.q
}
func (q *UniqueQ) Dequeued(bat batching.ProposalBatch, do func()) error {
delete(q.requeued, bat.GetUID())
do()
return nil
}
func (q *UniqueQ) Requeue(bat batching.ProposalBatch) error {
//dlog.AgentPrintfN(q.id, "Attempting to requeue batch with UID %d", bat.GetUID())
if _, exists := q.requeued[bat.GetUID()]; exists {
dlog.AgentPrintfN(q.id, "Not requeuing batch with UID %d as it is already requeued", bat.GetUID())
return &QueueingError{errors.New(fmt.Sprintf("Not Requeued as batch with UID %d is already in queue", bat.GetUID())), 1}
}
//dlog.AgentPrintfN(q.id, "Requeued batch with UID %d", bat.GetUID())
go func() { q.q <- bat }()
return nil
}
type ChosenUniqueQ struct {
chosen map[int32]struct{}
*UniqueQ
}
func (q *ChosenUniqueQ) ShouldPropose(bat batching.ProposalBatch) bool {
if _, exists := q.chosen[bat.GetUID()]; exists {
dlog.AgentPrintfN(q.id, "Batch with UID %d received to propose has been chosen so throwing out", bat.GetUID())
return false
}
return true
}
func (q *ChosenUniqueQ) Dequeued(bat batching.ProposalBatch, do func()) error {
if _, exists := q.chosen[bat.GetUID()]; exists {
dlog.AgentPrintfN(q.id, "Batch with UID %d received to propose has been chosen so throwing out", bat.GetUID())
return &QueueingError{errors.New(fmt.Sprintf("Not Requeued as batch with UID %d is chosen", bat.GetUID())), 2}
}
q.UniqueQ.Dequeued(bat, do)
return nil
}
func (q *ChosenUniqueQ) Requeue(bat batching.ProposalBatch) error {
//dlog.AgentPrintfN(q.id, "Attempting to requeue batch with UID %d", bat.GetUID())
if _, exists := q.chosen[bat.GetUID()]; exists {
dlog.AgentPrintfN(q.id, "Not requeuing batch with UID %d as it is chosen", bat.GetUID())
return &QueueingError{errors.New(fmt.Sprintf("Not Requeued batch with UID %d as it is chosen", bat.GetUID())), 2}
}
if _, exists := q.requeued[bat.GetUID()]; exists {
dlog.AgentPrintfN(q.id, "Not requeuing batch with UID %d as it is already requeued", bat.GetUID())
return &QueueingError{errors.New(fmt.Sprintf("Not Requeued batch with UID %d is already in queue", bat.GetUID())), 1}
}
//dlog.AgentPrintfN(q.id, "Requeued batch with UID %d", bat.GetUID())
go func() { q.q <- bat }()
return nil
}
func (q *ChosenUniqueQ) Learn(bat batching.ProposalBatch) {
if _, e := q.chosen[bat.GetUID()]; e {
dlog.AgentPrintfN(q.id, "Batch with UID %d learnt again", bat.GetUID())
return
}
q.chosen[bat.GetUID()] = struct{}{}
}
func UniqueQNew(rId int32, initLen int) *UniqueQ {
return &UniqueQ{make(map[int32]struct{}), make(chan batching.ProposalBatch, initLen), rId}
}
func ChosenUniqueQNew(rId int32, initLen int) *ChosenUniqueQ {
return &ChosenUniqueQ{make(map[int32]struct{}), UniqueQNew(rId, initLen)}
}
type ProposingChosenUniqueQ struct {
proposed map[int32]struct{}
*ChosenUniqueQ
}
func ProposingChosenUniqueueQNew(rId int32, initLen int) *ProposingChosenUniqueQ {
return &ProposingChosenUniqueQ{
proposed: make(map[int32]struct{}),
ChosenUniqueQ: ChosenUniqueQNew(rId, initLen),
}
}
func (q *ProposingChosenUniqueQ) ObserveProposed(bat batching.ProposalBatch) {
q.proposed[bat.GetUID()] = struct{}{}
}
func (q *ProposingChosenUniqueQ) Requeue(bat batching.ProposalBatch) error {
err := q.ChosenUniqueQ.Requeue(bat)
delete(q.proposed, bat.GetUID())
return err
}
func (q *ProposingChosenUniqueQ) Dequeued(bat batching.ProposalBatch, onSuccess func()) error {
if _, exists := q.proposed[bat.GetUID()]; exists {
dlog.AgentPrintfN(q.id, "Batch with UID %d is proposed already so throwing it out", bat.GetUID())
return &QueueingError{errors.New(fmt.Sprintf("Not Requeued batch with UID %d as it is chosen", bat.GetUID())), 3}
}
return q.ChosenUniqueQ.Dequeued(bat, onSuccess)
}
func (q *ProposingChosenUniqueQ) ShouldPropose(bat batching.ProposalBatch) bool {
if _, exists := q.proposed[bat.GetUID()]; exists {
dlog.AgentPrintfN(q.id, "Batch with UID %d is proposed already so throwing it out", bat.GetUID())
return false
}
return q.ChosenUniqueQ.ShouldPropose(bat)
}
type ProposingSeveralChosenUniqueQ struct {
proposed map[int32]int32
*ChosenUniqueQ
copies int32
}
func ProposingSeveralChosenUniqueQNew(rId int32, initLen int, proposingCopies int32) *ProposingSeveralChosenUniqueQ {
return &ProposingSeveralChosenUniqueQ{
proposed: make(map[int32]int32),
ChosenUniqueQ: ChosenUniqueQNew(rId, initLen),
copies: proposingCopies,
}
}
func (q *ProposingSeveralChosenUniqueQ) ObserveProposed(bat batching.ProposalBatch) {
q.proposed[bat.GetUID()] = q.proposed[bat.GetUID()] + 1
}
func (q *ProposingSeveralChosenUniqueQ) Requeue(bat batching.ProposalBatch) error {
q.proposed[bat.GetUID()] = q.proposed[bat.GetUID()] - 1
if q.proposed[bat.GetUID()] == -1 {
panic("Negative number of times batch proposed now")
}
if q.proposed[bat.GetUID()] > 0 {
return &QueueingError{
error: errors.New(fmt.Sprintf("Still %d proposals for batch with UID %d remaining outstanding so not requeueing", q.proposed[bat.GetUID()], bat.GetUID())),
code: 4,
}
}
err := q.ChosenUniqueQ.Requeue(bat)
return err
}
func (q *ProposingSeveralChosenUniqueQ) Dequeued(bat batching.ProposalBatch, onSuccess func()) error {
if _, exists := q.proposed[bat.GetUID()]; exists {
dlog.AgentPrintfN(q.id, "Batch with UID %d is proposed already so throwing it out", bat.GetUID())
return &QueueingError{errors.New(fmt.Sprintf("Not Requeued batch with UID %d as it is chosen", bat.GetUID())), 3}
}
return q.ChosenUniqueQ.Dequeued(bat, onSuccess)
}
func (q *ProposingSeveralChosenUniqueQ) ShouldPropose(bat batching.ProposalBatch) bool {
if _, exists := q.proposed[bat.GetUID()]; exists {
dlog.AgentPrintfN(q.id, "Batch with UID %d is proposed already so throwing it out", bat.GetUID())
return false
}
return q.ChosenUniqueQ.ShouldPropose(bat)
}
func (q *ProposingChosenUniqueQ) Learn(bat batching.ProposalBatch) {
q.ChosenUniqueQ.Learn(bat)
delete(q.proposed, bat.GetUID())
}