forked from tendermint/tendermint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wal.go
145 lines (127 loc) · 3.29 KB
/
wal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package consensus
import (
"bufio"
"os"
"time"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types"
)
//--------------------------------------------------------
// types and functions for savings consensus messages
type ConsensusLogMessage struct {
Time time.Time `json:"time"`
Msg ConsensusLogMessageInterface `json:"msg"`
}
type ConsensusLogMessageInterface interface{}
var _ = wire.RegisterInterface(
struct{ ConsensusLogMessageInterface }{},
wire.ConcreteType{types.EventDataRoundState{}, 0x01},
wire.ConcreteType{msgInfo{}, 0x02},
wire.ConcreteType{timeoutInfo{}, 0x03},
)
//--------------------------------------------------------
// Simple write-ahead logger
// Write ahead logger writes msgs to disk before they are processed.
// Can be used for crash-recovery and deterministic replay
// TODO: currently the wal is overwritten during replay catchup
// give it a mode so it's either reading or appending - must read to end to start appending again
type WAL struct {
fp *os.File
exists bool // if the file already existed (restarted process)
done chan struct{}
light bool // ignore block parts
}
func NewWAL(file string, light bool) (*WAL, error) {
var walExists bool
if _, err := os.Stat(file); err == nil {
walExists = true
}
fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return nil, err
}
return &WAL{
fp: fp,
exists: walExists,
done: make(chan struct{}),
light: light,
}, nil
}
func (wal *WAL) Exists() bool {
if wal == nil {
log.Warn("consensus msg log is nil")
return false
}
return wal.exists
}
// called in newStep and for each pass in receiveRoutine
func (wal *WAL) Save(clm ConsensusLogMessageInterface) {
if wal != nil {
if wal.light {
// in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
if mi, ok := clm.(msgInfo); ok {
_ = mi
if mi.PeerKey != "" {
return
}
}
}
var n int
var err error
wire.WriteJSON(ConsensusLogMessage{time.Now(), clm}, wal.fp, &n, &err)
wire.WriteTo([]byte("\n"), wal.fp, &n, &err) // one message per line
if err != nil {
PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, clm))
}
}
}
// Must not be called concurrently with a write.
func (wal *WAL) Close() {
if wal != nil {
wal.fp.Close()
}
wal.done <- struct{}{}
}
func (wal *WAL) Wait() {
<-wal.done
}
func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
var current int64
// start at the end
current, err = wal.fp.Seek(0, 2)
if err != nil {
return
}
// backup until we find the the right line
// current is how far we are from the beginning
for {
current -= 1
if current < 0 {
wal.fp.Seek(0, 0) // back to beginning
return
}
// backup one and read a new byte
if _, err = wal.fp.Seek(current, 0); err != nil {
return
}
b := make([]byte, 1)
if _, err = wal.fp.Read(b); err != nil {
return
}
if b[0] == '\n' || len(b) == 0 {
nLines += 1
// read a full line
reader := bufio.NewReader(wal.fp)
lineBytes, _ := reader.ReadBytes('\n')
if len(lineBytes) == 0 {
continue
}
if found(lineBytes) {
wal.fp.Seek(0, 1) // (?)
wal.fp.Seek(current, 0)
return
}
}
}
}