-
Notifications
You must be signed in to change notification settings - Fork 4
/
exec.go
55 lines (45 loc) · 1.6 KB
/
exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package dqpbft
import (
"github.com/ibalajiarun/go-consensus/peer"
"github.com/ibalajiarun/go-consensus/pkg/command/commandpb"
pb "github.com/ibalajiarun/go-consensus/protocols/dqpbft/dqpbftpb"
)
func (d *DQPBFT) exec() {
for {
oinst, ok := d.olog[d.nextODeliverIndex]
if !ok || !oinst.is.IsCommitted() {
d.logger.Debugf("Replica %d oinst not committed [oseq=%d]\n",
d.id, d.nextODeliverIndex)
return
}
for _, nextIID := range oinst.is.Instances {
inst, ok := d.log[nextIID]
if !ok || !inst.is.IsCommitted() || inst.is.Command == nil {
d.logger.Debugf("Replica %d inst not committed [oseq=%d, r=%d, s=%d, cmd=%d]\n",
d.id, d.nextODeliverIndex, nextIID.ReplicaID, d.nextDeliverIndex[nextIID.ReplicaID], inst)
return
}
}
for _, nextIID := range oinst.is.Instances {
inst := d.log[nextIID]
d.logger.Debugf("Replica %d execIdx [oseq=%d, r=%d, s=%d, cmd=%d]\n",
d.id, d.nextODeliverIndex, nextIID.ReplicaID, d.nextDeliverIndex[nextIID.ReplicaID], inst)
if inst == nil {
d.logger.Errorf("Replica %d execIdx [oseq=%d, r=%d, s=%d, cmd=%d]\n",
d.id, d.nextODeliverIndex, nextIID.ReplicaID, d.nextDeliverIndex[nextIID.ReplicaID], inst)
}
d.enqueueForDelivery(inst.is.Command)
inst.is.Command = nil
d.nextDeliverIndex[nextIID.ReplicaID]++
inst.is.Status = pb.InstanceState_Executed
}
oinst.is.Status = pb.OInstanceState_OExecuted
d.nextODeliverIndex++
}
}
func (d *DQPBFT) enqueueForDelivery(cmd *commandpb.Command) {
d.toDeliver = append(d.toDeliver, peer.ExecPacket{Cmd: *cmd})
}
func (d *DQPBFT) ClearExecutedCommands() {
d.toDeliver = nil
}