/
request_timer.go
152 lines (135 loc) · 3.54 KB
/
request_timer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package wire
import (
"sync"
"time"
)
// minDuration returns the minimum of two durations.
func minDuration(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}
type timerStatus int
const (
timerActive timerStatus = iota
timerStopped
timerTriggered
)
// requestTimer is a one-shot timer used to bound the duration of a request. It
// executes `onTimeout` if the timeout expires.
type requestTimer struct {
onTimeout func()
timeoutErr error
timer *time.Timer
mu sync.Mutex
status timerStatus
}
func newRequestTimer(timeout time.Duration, onTimeout func(), timeoutErr error) *requestTimer {
rt := &requestTimer{
onTimeout: onTimeout,
timeoutErr: timeoutErr,
status: timerActive,
}
rt.timer = time.AfterFunc(timeout, rt.onTriggered)
return rt
}
func (rt *requestTimer) onTriggered() {
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.status == timerActive {
rt.status = timerTriggered
rt.onTimeout()
}
}
// Stop should be called upon a successful request to prevent the timer from
// expiring.
func (rt *requestTimer) Stop() {
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.status == timerActive {
rt.status = timerStopped
rt.timer.Stop()
}
}
// ResolveError returns `timeoutErr` if the timer triggered, or otherwise
// `originalErr`.
func (rt *requestTimer) ResolveError(originalErr error) error {
rt.mu.Lock()
defer rt.mu.Unlock()
if rt.status == timerTriggered {
return rt.timeoutErr
}
return originalErr
}
// streamIdleTimer is an approximate timer used to detect idle streams.
// `onTimeout` may be called up to (timeout / pollDivisor) after `timeout` has
// expired.
type streamIdleTimer struct {
timeout time.Duration
onTimeout func()
task *periodicTask
mu sync.Mutex
status timerStatus
startTime time.Time
}
const (
pollDivisor = 4
maxPollInterval = time.Minute
)
// newStreamIdleTimer creates an unstarted timer.
func newStreamIdleTimer(timeout time.Duration, onTimeout func()) *streamIdleTimer {
st := &streamIdleTimer{
timeout: timeout,
onTimeout: onTimeout,
status: timerStopped,
}
st.task = newPeriodicTask(minDuration(timeout/pollDivisor, maxPollInterval), st.onPoll)
st.task.Start()
return st
}
// Restart the timer. Should be called when there is stream activity.
func (st *streamIdleTimer) Restart() {
st.mu.Lock()
defer st.mu.Unlock()
st.status = timerActive
st.startTime = time.Now()
}
// Stop the timer to prevent it from expiring.
func (st *streamIdleTimer) Stop() {
st.mu.Lock()
defer st.mu.Unlock()
st.status = timerStopped
}
// Shutdown should be called when the timer is no longer used.
func (st *streamIdleTimer) Shutdown() {
st.Stop()
st.task.Stop()
}
func (st *streamIdleTimer) onPoll() {
timeoutExpired := func() bool {
st.mu.Lock()
defer st.mu.Unlock()
// Note: time.Since() uses monotonic clock readings.
if st.status == timerActive && time.Since(st.startTime) > st.timeout {
st.status = timerTriggered
return true
}
return false
}()
if timeoutExpired {
st.onTimeout()
}
}