-
Notifications
You must be signed in to change notification settings - Fork 0
/
workerpool.go
140 lines (128 loc) · 3.8 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package main
import (
"fmt"
"time"
)
type WorkerPool struct {
name string
workers []*Worker
stats *WorkerPoolStats
autoscaleCooldown time.Time
}
type WorkerPoolStats struct {
failed int
success int
busy int
slots int //in slots - "cores"
workers int
errorRate float64
utilization float64
avgUtilization float64
sampleCount int
}
func NewWorkerPool(name string, workerCount int, workerSlots int) *WorkerPool {
workers := make([]*Worker, workerCount)
for i := range workers {
workers[i] = NewWorker(workerSlots)
}
return &WorkerPool{
name: name,
workers: workers,
stats: &WorkerPoolStats{},
autoscaleCooldown: time.Now(),
}
}
// add or remove workers based on error rate
// totally dirty hack and you'll need to tweak it to stabilize quickly
// speed is a divisor, 1-10
func (wp *WorkerPool) Autoscale(targetErrorRate float64, workerSlots int, delay int) {
cooldownDelayUp := time.Second * time.Duration(delay) //TODO
cooldownDelayDown := time.Second * time.Duration(2*delay)
stats := wp.Stats()
if time.Now().After(wp.autoscaleCooldown) {
if stats.errorRate > targetErrorRate {
wp.autoscaleCooldown = time.Now().Add(cooldownDelayUp)
wp.AddWorker(1, workerSlots)
} else if stats.errorRate < targetErrorRate/1.1 {
wp.autoscaleCooldown = time.Now().Add(cooldownDelayDown)
wp.RemoveWorker(1)
}
}
}
// generate work at rps
// try to add the work to the worker pool by selecting a random worker
func (wp *WorkerPool) AddWorkRandomly(rps int, mean float64, stdDev float64) {
interval := time.Millisecond * time.Duration(1000/rps) //TODO verify this is correct
ticker := time.NewTicker(time.Duration(interval))
for {
select {
case <-ticker.C:
worker := wp.SelectRandom()
worker.Render(NewRender(mean, stdDev))
}
}
}
func (wp *WorkerPool) AddWorker(count int, slots int) {
fmt.Printf("adding %d workers\n", count)
for i := 0; i < count; i++ {
wp.workers = append(wp.workers, NewWorker(slots))
}
}
// lost work is ignored //BUG kinda
func (wp *WorkerPool) RemoveWorker(count int) {
fmt.Printf("removing %d workers\n", count)
wp.workers = wp.workers[:len(wp.workers)-count]
}
// select a random worker from the pool
func (wp *WorkerPool) SelectRandom() *Worker {
pick := randGen.Intn(len(wp.workers))
return wp.workers[pick]
}
func (wp WorkerPool) PrintHeader() {
fmt.Printf("Failed\tOK\tRunning\tState\n")
}
func (wp *WorkerPool) Print() {
fmt.Print("\033[H\033[2J") //clear (*nix)
wp.PrintHeader()
for _, w := range wp.workers {
w.Print()
}
stats := wp.Stats()
fmt.Printf("Slots:\t")
fmt.Printf("busy %d\t", stats.busy)
fmt.Printf("total %d\t", stats.slots)
fmt.Printf("%%Util %.2f%%\t", stats.utilization*100)
fmt.Println()
fmt.Printf("Jobs:\t")
fmt.Printf("ok %d\t", stats.success)
fmt.Printf("total %d\t", stats.failed+stats.success)
fmt.Printf("failed %d\t", stats.failed)
fmt.Println()
fmt.Printf("Stats:\t")
fmt.Printf("%%OK: %.2f%%\t", 100-stats.errorRate*100)
fmt.Printf("avgUtil %.2f%%\t", stats.avgUtilization*100)
fmt.Println()
// fmt.Printf("sampleCount %d\n", stats.sampleCount)
}
func (wp *WorkerPool) Stats() (s WorkerPoolStats) {
s = *wp.stats
s.slots, s.busy = 0, 0 // these two are not cumulative
for _, w := range wp.workers {
s.slots += w.State.Slots
s.busy += w.State.Busy
wc := w.LatestStats()
s.failed += wc.Failed
s.success += wc.Success
}
s.errorRate = float64(s.failed) / float64(s.failed+s.success)
s.utilization = float64(s.busy) / float64(s.slots)
// calculate the overall avgUtilization so far
s.sampleCount += 1
if s.sampleCount == 1 {
s.avgUtilization = s.utilization
} else {
s.avgUtilization = (s.avgUtilization*float64(s.sampleCount-1) + s.utilization) / float64(s.sampleCount)
}
wp.stats = &s
return
}