/
execution_async_db.go
140 lines (120 loc) · 3.64 KB
/
execution_async_db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package state
import (
"sync"
"time"
)
type asyncDBContext struct {
// switch to turn on async save abciResponse and state
isAsyncSaveDB bool
// channel to write abciResponse async
abciResponseQueue chan abciResponse
/// channel to write state async
stateQueue chan State
// channel to feed back height of saved abci response and stat response
asyncFeedbackQueue chan int64
// flag to avoid waiting async state result for the first block
isWaitingLastBlock bool
//flag to avoid stop twice
isAsyncQueueStop bool
//wait group for quiting
wg sync.WaitGroup
}
const (
MAXCHAN_LEN = 2
FEEDBACK_LEN = 2
QUIT_SIG = -99
MAX_WAIT_TIME_SECONDS = 30
)
type abciResponse struct {
height int64
responses *ABCIResponses
}
func (blockExec *BlockExecutor) initAsyncDBContext() {
blockExec.abciResponseQueue = make(chan abciResponse, MAXCHAN_LEN)
blockExec.stateQueue = make(chan State, MAXCHAN_LEN)
blockExec.asyncFeedbackQueue = make(chan int64, FEEDBACK_LEN)
blockExec.wg.Add(2)
go blockExec.asyncSaveStateRoutine()
go blockExec.asyncSaveABCIRespRoutine()
}
func (blockExec *BlockExecutor) stopAsyncDBContext() {
if blockExec.isAsyncQueueStop {
return
}
blockExec.abciResponseQueue <- abciResponse{height: QUIT_SIG}
blockExec.stateQueue <- State{LastBlockHeight: QUIT_SIG}
blockExec.eventsChan <- event{}
blockExec.wg.Wait()
blockExec.isAsyncQueueStop = true
}
func (blockExec *BlockExecutor) SaveABCIResponsesAsync(height int64, responses *ABCIResponses) {
blockExec.abciResponseQueue <- abciResponse{height, responses}
}
func (blockExec *BlockExecutor) SaveStateAsync(state State) {
blockExec.stateQueue <- state
}
// asyncSaveRoutine handle writing state work
func (blockExec *BlockExecutor) asyncSaveStateRoutine() {
for stateMsg := range blockExec.stateQueue {
if stateMsg.LastBlockHeight == QUIT_SIG {
break
}
SaveState(blockExec.db, stateMsg)
blockExec.asyncFeedbackQueue <- stateMsg.LastBlockHeight
}
blockExec.wg.Done()
}
// asyncSaveRoutine handle writing abciResponse work
func (blockExec *BlockExecutor) asyncSaveABCIRespRoutine() {
for abciMsg := range blockExec.abciResponseQueue {
if abciMsg.height == QUIT_SIG {
break
}
SaveABCIResponses(blockExec.db, abciMsg.height, abciMsg.responses)
blockExec.asyncFeedbackQueue <- abciMsg.height
}
blockExec.wg.Done()
}
// SetIsAsyncSaveDB switches to open async write db feature
func (blockExec *BlockExecutor) SetIsAsyncSaveDB(isAsyncSaveDB bool) {
blockExec.isAsyncSaveDB = isAsyncSaveDB
}
// wait for the last sate and abciResponse to be saved
func (blockExec *BlockExecutor) tryWaitLastBlockSave(lastHeight int64) {
timeoutCh := time.After(MAX_WAIT_TIME_SECONDS * time.Second)
if blockExec.isAsyncSaveDB && blockExec.isWaitingLastBlock {
i := 0
for {
select {
case r := <-blockExec.asyncFeedbackQueue:
if r != lastHeight {
panic("Incorrect synced aysnc feed Height")
}
if i++; i == FEEDBACK_LEN {
return
}
case <-timeoutCh:
// It shouldn't be timeout. something must be wrong here
panic("Can't get last block aysnc result")
}
}
}
}
// try to save the abciReponse async
func (blockExec *BlockExecutor) trySaveABCIResponsesAsync(height int64, abciResponses *ABCIResponses) {
if blockExec.isAsyncSaveDB {
blockExec.isWaitingLastBlock = true
blockExec.SaveABCIResponsesAsync(height, abciResponses)
} else {
SaveABCIResponses(blockExec.db, height, abciResponses)
}
}
// try to save the state async
func (blockExec *BlockExecutor) trySaveStateAsync(state State) {
if blockExec.isAsyncSaveDB {
blockExec.SaveStateAsync(state)
} else {
//Async save state
SaveState(blockExec.db, state)
}
}