forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
global_state.go
134 lines (110 loc) · 3.05 KB
/
global_state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package wal
import (
"bufio"
"encoding/gob"
"fmt"
"io"
"math"
"os"
)
type GlobalState struct {
// used for creating index entries
CurrentFileSuffix int
CurrentFileOffset int64
// keep track of the next request number
LargestRequestNumber uint32
// used for rollover
FirstSuffix int
// last seq number used
ShardLastSequenceNumber map[uint32]uint64
// committed request number per server
ServerLastRequestNumber map[uint32]uint32
// path to the state file
path string
}
func newGlobalState(path string) (*GlobalState, error) {
f, err := os.Open(path)
state := &GlobalState{
ServerLastRequestNumber: map[uint32]uint32{},
ShardLastSequenceNumber: map[uint32]uint64{},
path: path,
}
if os.IsNotExist(err) {
return state, nil
}
if err != nil {
return nil, err
}
if err := state.read(f); err != nil {
return nil, err
}
state.path = path
return state, nil
}
func (self *GlobalState) writeToFile() error {
newFile, err := os.OpenFile(self.path+".new", os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return err
}
if _, err := newFile.Seek(0, os.SEEK_SET); err != nil {
return err
}
if err := self.write(newFile); err != nil {
return err
}
if err := newFile.Sync(); err != nil {
return err
}
if err := newFile.Close(); err != nil {
return err
}
os.Remove(self.path)
return os.Rename(self.path+".new", self.path)
}
func (self *GlobalState) write(w io.Writer) error {
fmt.Fprintf(w, "%d\n", 1) // write the version
return gob.NewEncoder(w).Encode(self)
}
func (self *GlobalState) read(r io.Reader) error {
// skip the version
reader := bufio.NewReader(r)
// read the version line
_, err := reader.ReadString('\n')
if err != nil {
return err
}
return gob.NewDecoder(reader).Decode(self)
}
func (self *GlobalState) recover(shardId uint32, sequenceNumber uint64) {
lastSequenceNumber := self.ShardLastSequenceNumber[shardId]
if sequenceNumber > lastSequenceNumber {
self.ShardLastSequenceNumber[shardId] = sequenceNumber
}
}
func (self *GlobalState) getNextRequestNumber() uint32 {
self.LargestRequestNumber++
return self.LargestRequestNumber
}
func (self *GlobalState) getCurrentSequenceNumber(shardId uint32) uint64 {
return self.ShardLastSequenceNumber[shardId]
}
func (self *GlobalState) setCurrentSequenceNumber(shardId uint32, sequenceNumber uint64) {
self.ShardLastSequenceNumber[shardId] = sequenceNumber
}
func (self *GlobalState) commitRequestNumber(serverId, requestNumber uint32) {
// TODO: we need a way to verify the request numbers, the following
// won't work though when the request numbers roll over
// if currentRequestNumber := self.ServerLastRequestNumber[serverId]; requestNumber < currentRequestNumber {
// panic(fmt.Errorf("Expected rn %d to be >= %d", requestNumber, currentRequestNumber))
// }
self.ServerLastRequestNumber[serverId] = requestNumber
}
func (self *GlobalState) LowestCommitedRequestNumber() uint32 {
requestNumber := uint32(math.MaxUint32)
for _, number := range self.ServerLastRequestNumber {
if number < requestNumber {
requestNumber = number
}
}
return requestNumber
}