-
Notifications
You must be signed in to change notification settings - Fork 0
/
runloop.go
99 lines (81 loc) · 1.86 KB
/
runloop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main
import (
"context"
"sync"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
type logPair struct {
streamer *Streamer
receiver *Receiver
}
type Runner struct {
k8 v1.PodInterface
config *Config
watcher *Watcher
streams map[string]logPair
lock *sync.Mutex
added chan Target
removed chan Target
}
func MakeRunner(config *Config, k8 v1.PodInterface) *Runner {
added := make(chan Target)
removed := make(chan Target)
watcher := &Watcher{
config: config,
added: added,
removed: removed,
}
return &Runner{
k8: k8,
config: config,
watcher: watcher,
streams: map[string]logPair{},
lock: &sync.Mutex{},
added: added,
removed: removed,
}
}
func (r *Runner) RunLogs(ctx context.Context) error {
go func() {
for target := range r.watcher.Added() {
r.addTarget(target)
}
}()
go func() {
for target := range r.watcher.Removed() {
r.removeTarget(target)
}
}()
return r.watcher.Run(ctx, r.k8)
}
func (r *Runner) addTarget(target Target) {
r.lock.Lock()
defer r.lock.Unlock()
parser := MakeParser(r.config.Mapping)
receiver := MakeReceiver(target.podName, target.containerName, len(r.streams), target.containerCfg.Fields, parser)
streamer := MakeStreamer(StreamerConfig{
K8Provider: r.k8,
PodName: target.podName,
ContainerName: target.containerName,
Receiver: receiver,
Seconds: r.config.SecondsBefore,
})
if err := streamer.Run(context.Background()); err != nil {
//fmt.Printf("failed to run streamer for %s, %v\n", target.ID(), err)
return
}
r.streams[target.ID()] = logPair{streamer: streamer, receiver: receiver}
}
func (r *Runner) removeTarget(target Target) {
r.lock.Lock()
defer r.lock.Unlock()
pair, ok := r.streams[target.ID()]
if !ok {
return
}
pair.streamer.Close()
delete(r.streams, target.ID())
}
func (r *Runner) Stop() {
r.watcher.Stop()
}