/
pipe_router.go
executable file
·201 lines (172 loc) · 4.14 KB
/
pipe_router.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package app
import (
"fmt"
"io"
"sync"
"time"
"context"
log "github.com/sirupsen/logrus"
"github.com/weaveworks/common/mtime"
"github.com/weaveworks/scope/common/xfer"
)
const (
gcInterval = 30 * time.Second // we check all the pipes every 30s
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
gcTimeout = 10 * time.Minute // after another 10 minutes, tombstoned pipes are forgotten
)
// End is an enum for either end of the pipe.
type End int
// Valid values of type End
const (
UIEnd = iota
ProbeEnd
)
func (e End) String() string {
if e == UIEnd {
return "ui"
}
return "probe"
}
// PipeRouter stores pipes and allows you to connect to either end of them.
type PipeRouter interface {
Exists(context.Context, string) (bool, error)
Get(context.Context, string, End) (xfer.Pipe, io.ReadWriter, error)
Release(context.Context, string, End) error
Delete(context.Context, string) error
Stop()
}
// PipeRouter connects incoming and outgoing pipes.
type localPipeRouter struct {
sync.Mutex
wait sync.WaitGroup
quit chan struct{}
pipes map[string]*pipe
}
// for each end of the pipe, we keep a reference count & lastUsedTIme,
// such that we can timeout pipes when either end is inactive.
type pipe struct {
xfer.Pipe
tombstoneTime time.Time
ui, probe end
}
type end struct {
refCount int
lastUsedTime time.Time
}
func (p *pipe) end(end End) (*end, io.ReadWriter) {
ui, probe := p.Ends()
if end == UIEnd {
return &p.ui, ui
}
return &p.probe, probe
}
// NewLocalPipeRouter returns a new local (in-memory) pipe router.
func NewLocalPipeRouter() PipeRouter {
pipeRouter := &localPipeRouter{
quit: make(chan struct{}),
pipes: map[string]*pipe{},
}
pipeRouter.wait.Add(1)
go pipeRouter.gcLoop()
return pipeRouter
}
func (pr *localPipeRouter) Exists(_ context.Context, id string) (bool, error) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
return true, nil
}
return !p.Closed(), nil
}
func (pr *localPipeRouter) Get(_ context.Context, id string, e End) (xfer.Pipe, io.ReadWriter, error) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
log.Debugf("Creating pipe id %s", id)
p = &pipe{
ui: end{lastUsedTime: mtime.Now()},
probe: end{lastUsedTime: mtime.Now()},
Pipe: xfer.NewPipe(),
}
pr.pipes[id] = p
}
if p.Closed() {
return nil, nil, fmt.Errorf("Pipe %s closed", id)
}
end, endIO := p.end(e)
end.refCount++
return p, endIO, nil
}
func (pr *localPipeRouter) Release(_ context.Context, id string, e End) error {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
return fmt.Errorf("Pipe %s not found", id)
}
end, _ := p.end(e)
end.refCount--
if end.refCount > 0 {
return nil
}
if !p.Closed() {
end.lastUsedTime = mtime.Now()
}
return nil
}
func (pr *localPipeRouter) Delete(_ context.Context, id string) error {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
return nil
}
p.Close()
p.tombstoneTime = mtime.Now()
return nil
}
func (pr *localPipeRouter) Stop() {
close(pr.quit)
pr.wait.Wait()
}
func (pr *localPipeRouter) gcLoop() {
defer pr.wait.Done()
ticker := time.Tick(gcInterval)
for {
select {
case <-pr.quit:
return
case <-ticker:
}
pr.timeout()
pr.garbageCollect()
}
}
func (pr *localPipeRouter) timeout() {
pr.Lock()
defer pr.Unlock()
now := mtime.Now()
for id, pipe := range pr.pipes {
if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) {
continue
}
if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) ||
(pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) {
log.Infof("Timing out pipe %s", id)
pipe.Close()
pipe.tombstoneTime = now
}
}
}
func (pr *localPipeRouter) garbageCollect() {
pr.Lock()
defer pr.Unlock()
now := mtime.Now()
for pipeID, pipe := range pr.pipes {
if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout {
delete(pr.pipes, pipeID)
}
}
}