/
time.go
228 lines (192 loc) · 6.32 KB
/
time.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package util
import (
"context"
"math"
"math/rand"
"net/http"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"
)
const (
nanosecondsInMillisecond = int64(time.Millisecond / time.Nanosecond)
)
func TimeToMillis(t time.Time) int64 {
return t.UnixNano() / nanosecondsInMillisecond
}
// TimeFromMillis is a helper to turn milliseconds -> time.Time
func TimeFromMillis(ms int64) time.Time {
return time.Unix(0, ms*nanosecondsInMillisecond)
}
// FormatTimeMillis returns a human readable version of the input time (in milliseconds).
func FormatTimeMillis(ms int64) string {
return TimeFromMillis(ms).String()
}
// FormatTimeModel returns a human readable version of the input time.
func FormatTimeModel(t model.Time) string {
return TimeFromMillis(int64(t)).String()
}
func FormatMillisToSeconds(ms int64) string {
return strconv.FormatFloat(float64(ms)/float64(1000), 'f', -1, 64)
}
// ParseTime parses the string into an int64, milliseconds since epoch.
func ParseTime(s string) (int64, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
ns = math.Round(ns*1000) / 1000
tm := time.Unix(int64(s), int64(ns*float64(time.Second)))
return TimeToMillis(tm), nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return TimeToMillis(t), nil
}
return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid timestamp", s)
}
// ParseTimeParam parses the time request parameter into an int64, milliseconds since epoch.
func ParseTimeParam(r *http.Request, paramName string, defaultValue int64) (int64, error) {
val := r.FormValue(paramName)
if val == "" {
val = strconv.FormatInt(defaultValue, 10)
}
result, err := ParseTime(val)
if err != nil {
return 0, errors.Wrapf(err, "Invalid time value for '%s'", paramName)
}
return result, nil
}
// DurationWithJitter returns random duration from "input - input*variance" to "input + input*variance" interval.
func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration {
// No duration? No jitter.
if input == 0 {
return 0
}
variance := int64(float64(input) * variancePerc)
jitter := rand.Int63n(variance*2) - variance
return input + time.Duration(jitter)
}
// DurationWithPositiveJitter returns random duration from "input" to "input + input*variance" interval.
func DurationWithPositiveJitter(input time.Duration, variancePerc float64) time.Duration {
// No duration? No jitter.
if input == 0 {
return 0
}
variance := int64(float64(input) * variancePerc)
jitter := rand.Int63n(variance)
return input + time.Duration(jitter)
}
// PositiveJitter returns random duration from "0" to "input*variance" interval.
func PositiveJitter(input time.Duration, variancePerc float64) time.Duration {
// No duration or no variancePerc? No jitter.
if input == 0 || variancePerc == 0 {
return 0
}
variance := int64(float64(input) * variancePerc)
jitter := rand.Int63n(variance)
return time.Duration(jitter)
}
// NewDisableableTicker essentially wraps NewTicker but allows the ticker to be disabled by passing
// zero duration as the interval. Returns a function for stopping the ticker, and the ticker channel.
func NewDisableableTicker(interval time.Duration) (func(), <-chan time.Time) {
if interval == 0 {
return func() {}, nil
}
tick := time.NewTicker(interval)
return func() { tick.Stop() }, tick.C
}
// FindMinMaxTime returns the time in milliseconds of the earliest and latest point in time the statement will try to process.
// This takes into account offsets, @ modifiers, and range selectors.
// If the expression does not select series, then FindMinMaxTime returns (0, 0).
func FindMinMaxTime(r *http.Request, expr parser.Expr, lookbackDelta time.Duration, now time.Time) (int64, int64) {
isQuery := strings.HasSuffix(r.URL.Path, "/query")
var startTime, endTime int64
if isQuery {
if t, err := ParseTimeParam(r, "time", now.UnixMilli()); err == nil {
startTime = t
endTime = t
}
} else {
if st, err := ParseTime(r.FormValue("start")); err == nil {
if et, err := ParseTime(r.FormValue("end")); err == nil {
startTime = st
endTime = et
}
}
}
es := &parser.EvalStmt{
Expr: expr,
Start: TimeFromMillis(startTime),
End: TimeFromMillis(endTime),
LookbackDelta: lookbackDelta,
}
return promql.FindMinMaxTime(es)
}
// SlotInfoFunc returns the slot number and the total number of slots
type SlotInfoFunc func() (int, int)
type SlottedTicker struct {
C <-chan time.Time // The channel on which the ticks are delivered.
done func()
d time.Duration
shouldReset bool
ticker *time.Ticker
sf SlotInfoFunc
slotJitter float64
}
func NewSlottedTicker(sf SlotInfoFunc, d time.Duration, slotJitter float64) *SlottedTicker {
c := make(chan time.Time, 1)
ctx, cancel := context.WithCancel(context.Background())
st := &SlottedTicker{
C: c,
done: cancel,
d: d,
sf: sf,
shouldReset: true,
slotJitter: slotJitter,
}
slitIndex, totalSlots := sf()
st.ticker = time.NewTicker(st.nextInterval())
go func() {
for ctx.Err() == nil {
select {
case t := <-st.ticker.C:
if i, s := sf(); i != slitIndex || s != totalSlots {
slitIndex, totalSlots = i, s
st.ticker.Reset(st.nextInterval())
st.shouldReset = true
continue
}
c <- t
if st.shouldReset {
st.ticker.Reset(st.d)
}
st.shouldReset = false
case <-ctx.Done():
return
}
}
close(c)
}()
return st
}
func (t *SlottedTicker) Stop() {
t.ticker.Stop()
t.done()
}
func (t *SlottedTicker) nextInterval() time.Duration {
slotIndex, totalSlots := t.sf()
// Discover what time the last iteration started
lastStartTime := time.UnixMilli((time.Now().UnixMilli() / t.d.Milliseconds()) * t.d.Milliseconds())
offset := time.Duration((float64(slotIndex) / float64(totalSlots)) * float64(t.d))
// Lets offset the time of the iteration
lastStartTime = lastStartTime.Add(offset)
// Keep adding the ticker duration until we pass time.now
for lastStartTime.Before(time.Now()) {
lastStartTime = lastStartTime.Add(t.d)
}
slotSize := t.d / time.Duration(totalSlots)
return time.Until(lastStartTime) + PositiveJitter(slotSize, t.slotJitter)
}