/
logs.go
99 lines (80 loc) · 2.3 KB
/
logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package kube
import (
"fmt"
"io"
"math"
"strconv"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/bjaglin/multiplexio"
pb "github.com/datacol-io/datacol/api/models"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func LogStreamReq(c *kubernetes.Clientset, w io.Writer, ns, app string, opts pb.LogStreamOptions) error {
pods, err := GetAllRunningPods(c, ns, app)
if err != nil {
return err
}
log.Debugf("Got %d pods for app=%s", len(pods), app)
//TODO: consider using https://github.com/djherbis/stream for reading multiple streams
var sources []multiplexio.Source
type reqQueue struct {
name string
request *rest.Request
}
var requests []reqQueue
for _, pod := range pods {
if opts.Proctype != "" && opts.Proctype != pod.ObjectMeta.Labels[typeLabel] {
continue
}
// Don't collect logs from ephemeral pods
if proctype := pod.ObjectMeta.Labels[typeLabel]; proctype == runProcessKind {
continue
}
name := pod.Name
req := c.Core().RESTClient().Get().
Namespace(ns).
Name(name).
Resource("pods").
SubResource("log").
Param("follow", strconv.FormatBool(opts.Follow)).
Param("tailLines", opts.TailLines)
var cntName string
if len(pod.Spec.Containers) > 0 {
cntName = pod.Spec.Containers[0].Name
}
req = req.Param("container", cntName)
log.Debugf("will stream logs from %v/%s", name, cntName)
if opts.Since > 0 {
sec := int64(math.Ceil(float64(opts.Since) / float64(time.Second)))
req = req.Param("sinceSeconds", strconv.FormatInt(sec, 10))
}
requests = append(requests, reqQueue{name: name, request: req})
}
var wg sync.WaitGroup
for _, rq := range requests {
wg.Add(1)
go func(rq reqQueue) {
defer wg.Done()
r, err := rq.request.Stream()
if err != nil {
log.Errorf("creating log stream: %v", err)
}
prefix := fmt.Sprintf("[%s] ", strings.TrimPrefix(rq.name, app+"-"))
sources = append(sources, multiplexio.Source{
Reader: r,
Write: func(dest io.Writer, token []byte) (int, error) {
return multiplexio.WriteNewLine(dest, append([]byte(prefix), token...))
},
})
}(rq)
}
log.Infof("waiting for stream handlers ...")
wg.Wait()
log.Debugf("Done. Got %d streams", len(sources))
_, err = io.Copy(w, multiplexio.NewReader(multiplexio.Options{}, sources...))
return err
}