forked from nats-io/nats-streaming-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ft.go
222 lines (210 loc) · 7.01 KB
/
ft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
// Copyright 2017 Apcera Inc. All rights reserved.
package server
import (
"fmt"
"math/rand"
"time"
"github.com/nats-io/go-nats"
"github.com/nats-io/nats-streaming-server/spb"
"github.com/nats-io/nats-streaming-server/stores"
"github.com/nats-io/nats-streaming-server/util"
)
// FT constants
const (
ftDefaultHBInterval = time.Second
ftDefaultHBMissedInterval = 1250 * time.Millisecond
)
var (
// Some go-routine will panic, which we can't recover in test.
// So the tests will set this to true to be able to test the
// correct behavior.
ftNoPanic bool
// For tests purposes, we may want to pause for the first
// attempt at getting the store lock so that test can
// switch store with a mocked one.
ftPauseBeforeFirstAttempt bool
ftPauseCh = make(chan struct{})
// This can be changed for tests purposes.
ftHBInterval = ftDefaultHBInterval
ftHBMissedInterval = ftDefaultHBMissedInterval
)
func ftReleasePause() {
ftPauseCh <- struct{}{}
}
// ftStart will return only when this server has become active
// and was able to get the store's exclusive lock.
// This is running in a separate go-routine so if server state
// changes, take care of using the server's lock.
func (s *StanServer) ftStart() (retErr error) {
Noticef("Starting in standby mode")
// For tests purposes
if ftPauseBeforeFirstAttempt {
<-ftPauseCh
}
print, _ := util.NewBackoffTimeCheck(time.Second, 2, time.Minute)
for {
select {
case <-s.ftQuit:
// we are done
return nil
case <-s.ftHBCh:
// go back to the beginning of the for loop
continue
case <-time.After(s.ftHBMissedInterval):
// try to lock the store
}
locked, err := s.ftGetStoreLock()
if err != nil {
// This is considered a fatal error and we exit
return err
} else if locked {
break
}
// Here, we did not get the lock, print and go back to standby.
// Use some backoff for the printing to not fill up the log
if print.Ok() {
Noticef("ft: unable to get store lock at this time, going back to standby")
}
}
// Capture the time this server activated. It will be used in case several
// servers claim to be active. Not bulletproof since there could be clock
// differences, etc... but when more than one server has acquired the store
// lock it means we are already in trouble, so just trying to minimize the
// possible store corruption...
activationTime := time.Now()
Noticef("Server is active")
s.startGoRoutine(func() {
s.ftSendHBLoop(activationTime)
})
// Start the recovery process, etc..
return s.start(FTActive)
}
// ftGetStoreLock returns true if the server was able to get the
// exclusive store lock, false othewise, or if there was a fatal error doing so.
func (s *StanServer) ftGetStoreLock() (bool, error) {
// Normally, the store would be set early and is immutable, but some
// FT tests do set a mock store after the server is created, so use
// locking here to avoid race reports.
s.mu.Lock()
store := s.store
s.mu.Unlock()
if ok, err := store.GetExclusiveLock(); !ok || err != nil {
// We got an error not related to locking (could be not supported,
// permissions error, file not reachable, etc..)
if err != nil {
return false, fmt.Errorf("ft: fatal error getting the store lock: %v", err)
}
// If ok is false, it means that we did not get the lock.
return false, nil
}
return true, nil
}
// ftSendHBLoop is used by an active server to send HB to the FT subject.
// Standby servers receiving those HBs do not attempt to lock the store.
// When they miss HBs, they will.
func (s *StanServer) ftSendHBLoop(activationTime time.Time) {
// Release the wait group on exit
defer s.wg.Done()
timeAsBytes, _ := activationTime.MarshalBinary()
ftHB := &spb.CtrlMsg{
MsgType: spb.CtrlMsg_FTHeartbeat,
ServerID: s.serverID,
Data: timeAsBytes,
}
ftHBBytes, _ := ftHB.Marshal()
print, _ := util.NewBackoffTimeCheck(time.Second, 2, time.Minute)
for {
if err := s.ftnc.Publish(s.ftSubject, ftHBBytes); err != nil {
if print.Ok() {
Errorf("Unable to send FT heartbeat: %v", err)
}
}
startSelect:
select {
case m := <-s.ftHBCh:
hb := spb.CtrlMsg{}
if err := hb.Unmarshal(m.Data); err != nil {
goto startSelect
}
// Ignore our own message
if hb.MsgType != spb.CtrlMsg_FTHeartbeat || hb.ServerID == s.serverID {
goto startSelect
}
// Another server claims to be active
peerActivationTime := time.Time{}
if err := peerActivationTime.UnmarshalBinary(hb.Data); err != nil {
Errorf("Error decoding activation time: %v", err)
} else {
// Step down if the peer's activation time is earlier than ours.
err := fmt.Errorf("ft: serverID %q claims to be active", hb.ServerID)
if peerActivationTime.Before(activationTime) {
err = fmt.Errorf("%s, aborting", err)
if ftNoPanic {
s.setLastError(err)
return
}
panic(err)
} else {
Errorf(err.Error())
}
}
case <-time.After(s.ftHBInterval):
// We'll send the ping at the top of the for loop
case <-s.ftQuit:
return
}
}
}
// ftSetup checks that all required FT parameters have been specified and
// create the channel required for shutdown.
// Note that FTGroupName has to be set before server invokes this function,
// so this parameter is not checked here.
func (s *StanServer) ftSetup() error {
// Check that store type is ok. So far only support for FileStore
if s.opts.StoreType != stores.TypeFile {
return fmt.Errorf("ft: only %v stores supported in FT mode", stores.TypeFile)
}
// So far, those are not exposed to users, just used in tests.
// Still make sure that the missed HB interval is > than the HB
// interval.
if ftHBMissedInterval < time.Duration(float64(ftHBInterval)*1.1) {
return fmt.Errorf("ft: the missed heartbeat interval needs to be"+
" at least 10%% of the heartbeat interval (hb=%v missed hb=%v",
ftHBInterval, ftHBMissedInterval)
}
// Set the HB and MissedHB intervals, using a bit of randomness
rand.Seed(time.Now().UnixNano())
s.ftHBInterval = ftGetRandomInterval(ftHBInterval)
s.ftHBMissedInterval = ftGetRandomInterval(ftHBMissedInterval)
// Subscribe to FT subject
s.ftSubject = fmt.Sprintf("%s.%s.%s", ftHBPrefix, s.opts.ID, s.opts.FTGroupName)
s.ftHBCh = make(chan *nats.Msg)
sub, err := s.ftnc.Subscribe(s.ftSubject, func(m *nats.Msg) {
// Dropping incoming FT HBs is not crucial, we will then check for
// store lock.
select {
case s.ftHBCh <- m:
default:
}
})
if err != nil {
return fmt.Errorf("ft: unable to subscribe on ft subject: %v", err)
}
// We don't want to cause possible slow consumer error
sub.SetPendingLimits(-1, -1)
// Create channel to notify FT go routine to quit.
s.ftQuit = make(chan struct{}, 1)
// Set the state as standby initially
s.state = FTStandby
return nil
}
// ftGetRandomInterval returns a random interval with at most +/- 10%
// of the given interval.
func ftGetRandomInterval(interval time.Duration) time.Duration {
tenPercent := int(float64(interval) * 0.10)
random := time.Duration(rand.Intn(tenPercent))
if rand.Intn(2) == 1 {
return interval + random
}
return interval - random
}