This repository has been archived by the owner on Dec 30, 2020. It is now read-only.
/
coordinator.go
82 lines (69 loc) · 1.8 KB
/
coordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package subprocessor
import (
"fmt"
"github.com/satori/go.uuid"
logger "github.com/sirupsen/logrus"
"sync"
)
const (
defaultRoutineCount = 10
)
type Coordinator struct {
coordinationPool chan chan Resource
ResourceChannel chan Resource
routines []*Routine
quit chan bool
id string
}
var instance *Coordinator
var once sync.Once
func NewCoordinator() *Coordinator {
coordinator := &Coordinator{}
uuid, err := uuid.NewV4()
if err != nil {
panic(err)
}
coordinator.id = fmt.Sprintf("%s", uuid)
logger.Infof("Starting coordinator: %s", coordinator.id)
//Create channels
//Two way coordination channel
coordinator.coordinationPool = make(chan chan Resource, defaultRoutineCount)
//Input channel
coordinator.ResourceChannel = make(chan Resource)
coordinator.quit = make(chan bool)
//Create routines
for i := 0; i < defaultRoutineCount; i++ {
rt := NewRoutine(coordinator.coordinationPool)
coordinator.routines = append(coordinator.routines, rt)
rt.Start() //Start each routine
}
logger.Infof("Finished building coordinator: %s", coordinator.id)
return coordinator
}
func (coordinator *Coordinator) Destroy() {
go func() {
logger.Infof("Coordinator %s is being destroyed", coordinator.id)
coordinator.stop()
for _, routine := range coordinator.routines {
routine.quit <- true
}
}()
}
func (coordinator *Coordinator) stop() {
coordinator.quit <- true
logger.Infof("Coordinator %s will stop receiving input", coordinator.id)
}
func (coordinator *Coordinator) Run() {
for {
select {
case msg := <-coordinator.ResourceChannel: //External Coordinator input channel
go func(msg Resource) {
next := <-coordinator.coordinationPool
next <- msg
logger.Info("Processed new message!")
}(msg)
case _ = <-coordinator.quit:
return
}
}
}