-
Notifications
You must be signed in to change notification settings - Fork 378
/
coordinator.go
130 lines (110 loc) · 2.3 KB
/
coordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package omnisearch
import (
"context"
"sync"
)
// Coordinator executes searches
type Coordinator interface {
// Do execute a search with the currently available resources.
// The chan will be closed once finished.
Do(context.Context, ...interface{}) <-chan *ResultReturn
}
type coordinator struct {
engines []Engine
parsers []Parser
}
// NewCoordinatorWithBootstrap createsb a new Coordinator.
func NewCoordinator(ctx context.Context, cfgs ...Configurator) (Coordinator, error) {
c := &coordinatorConfig{
coordinator: &coordinator{},
providers: []Provider{NewMirror(ctx)},
}
for _, v := range cfgs {
err := v(c)
if err != nil {
return nil, err
}
}
return c.coordinator, nil
}
type search struct {
c *coordinator
wg *sync.WaitGroup
ctx context.Context
cancel func()
ic chan *ResultReturn // Internal channel
rc chan *ResultReturn // Return channel
}
type rootInformator struct{}
func (rootInformator) Name() string {
return "Root"
}
func (r rootInformator) String() string {
return r.Name()
}
func (c *coordinator) Do(ctx context.Context, parents ...interface{}) <-chan *ResultReturn {
var wg sync.WaitGroup
s := &search{
c: c,
wg: &wg,
ic: make(chan *ResultReturn),
rc: make(chan *ResultReturn),
}
s.ctx, s.cancel = context.WithCancel(ctx)
go s.manager()
// It's important to use a single instance as this might be used later to do search trees.
var ri Informator = rootInformator{}
var parentsR []*ResultReturn
{
i := len(parents)
(*s.wg).Add(i) // Increment for the `s.handle` later.
parentsR = make([]*ResultReturn, i)
for i > 0 {
i--
parentsR[i] = &ResultReturn{
Object: parents[i],
Finder: ri,
}
}
}
for _, r := range parentsR {
go s.handle(r)
}
go s.wgWatcher()
return s.rc
}
func (s *search) manager() {
for {
select {
case r := <-s.ic:
(*s.wg).Add(1)
go s.handle(r)
case <-s.ctx.Done():
close(s.rc)
return
}
}
}
func (s *search) handle(r *ResultReturn) {
defer (*s.wg).Done()
if r.Object != nil {
for _, v := range s.c.engines {
v.Search(s.ctx, s.wg, s.ic, r)
}
for _, v := range s.c.parsers {
pr := v.Parse(r)
if pr != nil {
defer (*s.wg).Add(1)
go s.handle(pr)
}
}
s.rc <- r
}
if r.Decrement {
(*s.wg).Done()
}
}
func (s *search) wgWatcher() {
(*s.wg).Wait()
s.cancel()
}