-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
111 lines (99 loc) · 2.38 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package main
import (
"fmt"
"time"
)
// Worker is capable of performing parallel renders
// it keeps track of its own cumulative stats
type Worker struct {
stats *workerStats
State *workerState
renders []*Render
startChan chan Render
doneChan chan int // index of the render which is done
}
type workerStats struct {
cumulative WorkerCounters
prev WorkerCounters
}
type WorkerCounters struct {
Failed, Success int
}
type workerState struct {
Busy, Slots int
}
func NewWorker(slots int) *Worker {
w := &Worker{
renders: make([]*Render, slots),
startChan: make(chan Render),
doneChan: make(chan int),
stats: &workerStats{},
State: &workerState{Slots: slots},
}
go w.manage()
return w
}
// substracts the previous stats from the current stats
// the workerPool keeps cumulative counts so they are not lost when a worker is removed
// the reads are not thread safe, but the writes are
func (w *Worker) LatestStats() WorkerCounters {
if w.stats.prev == (WorkerCounters{}) {
w.stats.prev = w.stats.cumulative
return w.stats.cumulative
} else {
latest := WorkerCounters{
Failed: w.stats.cumulative.Failed - w.stats.prev.Failed,
Success: w.stats.cumulative.Success - w.stats.prev.Success,
}
w.stats.prev = w.stats.cumulative
return latest
}
}
// the actual render is in manage() and the channel provides thread safety
func (w *Worker) Render(r Render) {
w.startChan <- r
}
// listens to start and done channels
// fills or empties the slots
// manages the stats
func (w *Worker) manage() {
for {
Outer:
select {
// render is starting, use the first open slot
case r := <-w.startChan:
for i, render := range w.renders {
if render == nil {
w.renders[i] = &r
w.State.Busy++
go func() {
// wait for the render to complete
time.Sleep(r.duration)
w.doneChan <- i
}()
break Outer // slot found
}
}
// no slot found
w.stats.cumulative.Failed++
// render is complete, clear the slot
case idx := <-w.doneChan:
w.renders[idx] = nil
w.State.Busy--
w.stats.cumulative.Success++
}
}
}
func (w *Worker) Print() {
fmt.Printf(" %d\t%d\t%d\t", w.stats.cumulative.Failed, w.stats.cumulative.Success, w.State.Busy)
fmt.Print("|")
for _, r := range w.renders {
if r != nil {
//print id as hexadecimal
fmt.Printf("%02x|", r.id)
} else {
fmt.Print(" |")
}
}
fmt.Println()
}