forked from dolthub/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
timeout_thread.go
132 lines (118 loc) · 3.85 KB
/
timeout_thread.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package buffer
import (
"sync"
"time"
)
// timeoutThread captures the state of the timeout thread.
// The thread actively removes the head of the queue when that entry exceeds
// its buffering window.
// For each active failover there will be one thread (Go routine).
type timeoutThread struct {
sb *shardBuffer
// maxDuration enforces that a failover stops after
// -buffer_max_failover_duration at most.
maxDuration *time.Timer
// stopChan will be closed when the thread should stop e.g. before the drain.
stopChan chan struct{}
wg sync.WaitGroup
// mu guards access to "queueNotEmpty" between this thread and callers of
// notifyQueueNotEmpty().
mu sync.Mutex
// queueNotEmpty will be closed to notify the timeout thread when the queue
// state changes from empty to non-empty. After it's closed, a new object will
// be assigned to this field.
queueNotEmpty chan struct{}
}
func newTimeoutThread(sb *shardBuffer) *timeoutThread {
return &timeoutThread{
sb: sb,
maxDuration: time.NewTimer(*maxFailoverDuration),
stopChan: make(chan struct{}),
queueNotEmpty: make(chan struct{}),
}
}
func (tt *timeoutThread) start() {
tt.wg.Add(1)
go tt.run()
}
// stop stops the thread and blocks until it has exited.
func (tt *timeoutThread) stop() {
close(tt.stopChan)
tt.wg.Wait()
}
func (tt *timeoutThread) notifyQueueNotEmpty() {
tt.mu.Lock()
defer tt.mu.Unlock()
close(tt.queueNotEmpty)
// Create a new channel which will be used by the next notify call.
tt.queueNotEmpty = make(chan struct{})
}
func (tt *timeoutThread) run() {
defer tt.wg.Done()
defer tt.maxDuration.Stop()
// While this thread is running, it can be in two states:
for {
if e := tt.sb.oldestEntry(); e != nil {
// 1. queue not empty: Wait for the oldest entry to exceed the window.
if stopped := tt.waitForEntry(e); stopped {
return
}
} else {
// 2. queue empty: Wait for an entry to show up.
if stopped := tt.waitForNonEmptyQueue(); stopped {
return
}
}
}
}
// waitForEntry blocks until "e" exceeds its buffering window or buffering stops
// in general. It returns true if the timeout thread should stop.
func (tt *timeoutThread) waitForEntry(e *entry) bool {
windowExceeded := time.NewTimer(e.deadline.Sub(time.Now()))
defer windowExceeded.Stop()
select {
// a) Always check these channels, regardless of the state.
case <-tt.maxDuration.C:
// Max duration is up. Stop buffering. Do not error out entries explicitly.
tt.sb.stopBufferingDueToMaxDuration()
return true
case <-tt.stopChan:
// Failover ended before timeout. Do nothing.
return true
// b) Entry-specific checks.
case <-e.done:
// Entry was drained or evicted. Get the next entry.
return false
// NOTE: We're not waiting for e.bufferCtx here (which triggers when the
// request was externally aborted e.g. due to context canceled) because then
// this thread would race with the request thread which runs
// shardBuffer.remove(). Instead, remove() will notify us here eventually by
// closing "e.done".
case <-windowExceeded.C:
// Entry expired. Evict it and then get the next entry.
tt.sb.evictOldestEntry(e)
return false
}
}
// waitForNonEmptyQueue blocks until the buffer queue gets a new element or
// the timeout thread should be stopped.
// It returns true if the timeout thread should stop.
func (tt *timeoutThread) waitForNonEmptyQueue() bool {
tt.mu.Lock()
queueNotEmpty := tt.queueNotEmpty
tt.mu.Unlock()
select {
// a) Always check these channels, regardless of the state.
case <-tt.maxDuration.C:
// Max duration is up. Stop buffering. Do not error out entries explicitly.
tt.sb.stopBufferingDueToMaxDuration()
return true
case <-tt.stopChan:
// Failover ended before timeout. Do nothing.
return true
// b) State-specific check.
case <-queueNotEmpty:
// At least one entry present. Check its timeout in the next iteration.
return false
}
}