-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.go
104 lines (93 loc) · 2.96 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main
// The requester sends Requests to the balancer.
// Note the return channel inside the request
type Request struct {
fn func() int // The operation to perform.
c chan int // The channel to return the result.
}
// An artificial but illustrative simulation of a requester, a load generator.
func requester(work chan<- Request) {
c := make(chan int)
for {
// Kill some time (fake load).
Sleep(rand.Int63n(nWorker * 2 * Second))
work <- Request{workFn, c} // send request
result := <-c // wait for answer
furtherProcess(result)
}
}
// Worker definition
// A channel of requests, plus some load tracking data.
type Worker struct {
requests chan Request // work to do (buffered channel)
pending int // count of pending tasks
index int // index in the heap
}
// Worker
// Balancer sends request to most lightly loaded worker.
// The channel of requests (w.requests) delivers requests to each worker.
// The balancer tracks the number of pending requests as a measure of load.
// Each response goes directly to its requester.
// Could run the loop body as a goroutine for parallelism.
func (w *Worker) work(done chan *Worker) {
for {
req := <-w.requests // get Request from balancer
req.c <- req.fn() // call fn and send result
done <- w // we've finished this request
}
}
// Balancer definition
// The load balancer needs a pool of workers and a single channel
// to which requesters can report task completion
type Pool []*Worker
type Balancer struct {
pool Pool
done chan *Worker
}
// Balancer function
// Easy!
// Just need to implement dispatch and completed.
func (b *Balancer) balance(work chan Request) {
for {
select {
case req := <-work: // received a Request...
b.dispatch(req) // ...so send it to a Worker
case w := <-b.done: // a worker has finished ...
b.completed(w) // ...so update its info
}
}
}
// A heap of channels
// Make Pool an implementation of the Heap interface by providing a few methods such as Less() function.
// Now we balance by making the Pool a heap tracked by load.
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
// Dispatch
// All the pieces are in place.
// Send Request to worker
func (b *Balancer) dispatch(req Request) {
// Grab the least loaded worker...
w := heap.Pop(&b.pool).(*Worker)
// ...send it the task.
w.requests <- req
// One more in its work queue.
w.pending++
// Put it into its place on the heap.
heap.Push(&b.pool, w)
}
// Completed
// Job is complete; update heap
func (b *Balancer) completed(w *Worker) {
// One fewer in the queue.
w.pending--
// Remove it from heap.
heap.Remove(&b.pool, w.index)
// Put it into its place on the heap.
heap.Push(&b.pool, w)
}
// Lesson
// * A complex problem can be broken down into easy-to-understand components.
// * The pieces can be composed concurrently.
// * The result is easy to understand, efficient, scalable, and correct.
// Maybe even parallel.