-
Notifications
You must be signed in to change notification settings - Fork 134
/
background_task.go
110 lines (98 loc) · 3.12 KB
/
background_task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package task
import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// task represents a function to be called periodically.
// Task runtimes are recorded in Prometheus.
// The Prometehus log name for each task is prepended with metricName.
type task struct {
function func()
interval time.Duration
metricName string
stopChannel chan bool
}
// BackgroundTaskManager is used for registering tasks (functions) to be run periodically.
// Prometehus log names for each task are prepended with metricsPrefix.
// BackgroundTaskManager is not threadsafe; it should only be accessed from a single thread.
type BackgroundTaskManager struct {
tasks []*task
metricsPrefix string
wg *sync.WaitGroup
}
// NewBackgroundTaskManager returns a new BackgroundTaskManager with no registered tasks.
// Call Register to add and start tasks.
func NewBackgroundTaskManager(metricsPrefix string) *BackgroundTaskManager {
return &BackgroundTaskManager{
tasks: []*task{},
metricsPrefix: metricsPrefix,
wg: &sync.WaitGroup{},
}
}
// Register the function f to be run periodically.
// Interval is the time between function returns and the next time it is called,
// i.e., the time between calls to function is interval + the runtime of the function.
func (m *BackgroundTaskManager) Register(function func(), interval time.Duration, metricName string) {
task := &task{
function: function,
interval: interval,
metricName: metricName,
stopChannel: make(chan bool),
}
m.startBackgroundTask(task)
m.tasks = append(m.tasks, task)
}
// StopAll stops all tasks. Returns after all currently running tasks have finished or timeout has
// elapsed, whichever occurs first. Returns true if there are tasks still running when this
// function returns and false otherwise.
func (m *BackgroundTaskManager) StopAll(timeout time.Duration) bool {
m.stopTasks()
return m.waitForShutdownCompletion(timeout)
}
func (m *BackgroundTaskManager) startBackgroundTask(task *task) {
taskDurationHistogram := promauto.NewHistogram(
prometheus.HistogramOpts{
Name: m.metricsPrefix + task.metricName + "_latency_seconds",
Help: "Background loop " + task.metricName + " latency in seconds",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 15),
})
m.wg.Add(1)
go func() {
start := time.Now()
task.function()
duration := time.Since(start)
taskDurationHistogram.Observe(duration.Seconds())
for {
select {
case <-time.After(task.interval):
case <-task.stopChannel:
m.wg.Done()
return
}
innerStart := time.Now()
task.function()
innerDuration := time.Since(innerStart)
taskDurationHistogram.Observe(innerDuration.Seconds())
}
}()
}
func (m *BackgroundTaskManager) waitForShutdownCompletion(timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
m.wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}
func (m *BackgroundTaskManager) stopTasks() {
for _, task := range m.tasks {
task.stopChannel <- true
}
}