/
logs.go
126 lines (112 loc) · 2.99 KB
/
logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package handlers
import (
"io"
"strconv"
"time"
"git.containerum.net/ch/kube-api/pkg/kubernetes"
"git.containerum.net/ch/kube-api/pkg/utils/timeoutreader"
"git.containerum.net/ch/kube-api/pkg/utils/watchdog"
"git.containerum.net/ch/kube-api/pkg/utils/wsutils"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
log "github.com/sirupsen/logrus"
)
func logStreamSetup(conn *websocket.Conn, rc io.ReadCloser, logOpt *kubernetes.LogOptions) {
if logOpt.Follow {
rc = timeoutreader.NewTimeoutReader(rc, 30*time.Minute, true)
} else {
rc = timeoutreader.NewTimeoutReader(rc, 10*time.Second, true)
}
// watchdog for reader, resets by websocket pong
closeWd := watchdog.New(wsTimeout, func() { rc.Close() })
conn.SetPongHandler(func(appData string) error {
conn.SetWriteDeadline(time.Now().Add(wsTimeout))
conn.SetReadDeadline(time.Now().Add(wsTimeout))
closeWd.Kick()
return nil
})
conn.SetWriteDeadline(time.Now().Add(wsTimeout))
conn.SetReadDeadline(time.Now().Add(wsTimeout))
var (
done = make(chan struct{})
data = make(chan []byte)
)
go readConn(conn)
go readLogs(rc, data, done)
go writeLogs(conn, data, done)
}
func makeLogOption(c *gin.Context) kubernetes.LogOptions {
followStr := c.Query(followQuery)
previousStr := c.Query(previousQuery)
container := c.Query(containerQuery)
tail, _ := strconv.Atoi(c.Query(tailQuery))
if tail <= 0 || tail > tailMax {
tail = tailDefault
}
return kubernetes.LogOptions{
Tail: int64(tail),
Follow: followStr == "true",
Previous: previousStr == "true",
Container: container,
}
}
func readLogs(logs io.ReadCloser, ch chan<- []byte, done chan<- struct{}) {
buf := [wsBufferSize]byte{}
defer logs.Close()
defer func() { done <- struct{}{} }()
for {
readBytes, err := logs.Read(buf[:])
switch err {
case nil:
// pass
case io.EOF, timeoutreader.ErrReadTimeout:
return
default:
log.WithError(err).Error("Log read failed")
return
}
ch <- buf[:readBytes]
}
}
func writeLogs(conn *websocket.Conn, ch <-chan []byte, done <-chan struct{}) {
defer func() {
conn.WriteMessage(websocket.CloseMessage, nil)
conn.Close()
}()
pingTimer := time.NewTicker(wsPingPeriod)
for {
var err error
select {
case <-done:
return
case <-pingTimer.C:
err = conn.WriteMessage(websocket.PingMessage, nil)
case data := <-ch:
err = conn.WriteMessage(websocket.TextMessage, data)
}
switch {
case err == nil,
wsutils.IsNetTemporary(err):
// pass
case err == timeoutreader.ErrReadTimeout,
err == websocket.ErrCloseSent, // connection closed by us
wsutils.IsNetTimeout(err), // deadline failed
wsutils.IsBrokenPipe(err), // connection closed by client
wsutils.IsClose(err):
return
default:
log.WithError(err).Errorf("Log send failed")
return
}
}
}
func readConn(conn *websocket.Conn) {
defer log.Debugf("End writing logs")
for {
_, _, err := conn.ReadMessage() // to trigger pong handlers and check connection though
if err != nil {
conn.Close()
return
}
}
}