-
Notifications
You must be signed in to change notification settings - Fork 134
/
logservice.go
141 lines (115 loc) · 3.43 KB
/
logservice.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package logs
import (
"context"
"fmt"
"strings"
"time"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/armadaproject/armada/internal/common/auth/authorization"
"github.com/armadaproject/armada/internal/common/cluster"
"github.com/armadaproject/armada/pkg/api/binoculars"
)
type LogService interface {
GetLogs(ctx context.Context, params *LogParams) ([]*binoculars.LogLine, error)
}
type LogParams struct {
Principal authorization.Principal
Namespace string
PodName string
SinceTime string
LogOptions *v1.PodLogOptions
}
type KubernetesLogService struct {
clientProvider cluster.KubernetesClientProvider
}
const MaxLogBytes = 2000000
func NewKubernetesLogService(clientProvider cluster.KubernetesClientProvider) *KubernetesLogService {
return &KubernetesLogService{clientProvider: clientProvider}
}
func (l *KubernetesLogService) GetLogs(ctx context.Context, params *LogParams) ([]*binoculars.LogLine, error) {
client, err := l.clientProvider.ClientForUser(params.Principal.GetName(), params.Principal.GetGroupNames())
if err != nil {
return nil, err
}
since, err := time.Parse(time.RFC3339Nano, params.SinceTime)
if err == nil {
params.LogOptions.SinceTime = &metav1.Time{Time: since}
} else {
if params.SinceTime != "" {
log.Warnf("failed to parse since time for pod %s: %v", params.PodName, err)
}
}
limitBytes := int64(MaxLogBytes)
params.LogOptions.Follow = false
params.LogOptions.Timestamps = true
params.LogOptions.LimitBytes = &limitBytes
if params.Namespace == "" {
params.Namespace = "default"
}
req := client.CoreV1().
Pods(params.Namespace).
GetLogs(params.PodName, params.LogOptions)
result := req.Do(ctx)
if result.Error() != nil {
return nil, result.Error()
}
rawLog, err := result.Raw()
if err != nil {
return nil, err
}
logLines, errs := ConvertLogs(rawLog)
for _, err := range errs {
log.Errorf(
"failed to parse log line for namespace: %q, pod: %q: %v",
params.Namespace,
params.PodName,
err)
}
return logLines, nil
}
func ConvertLogs(rawLog []byte) ([]*binoculars.LogLine, []error) {
lines := strings.Split(string(rawLog), "\n")
// If log is larger than MAX_PAYLOAD_SIZE, discard last lines until it is smaller or equal to MAX_PAYLOAD_SIZE
if len(rawLog) > MaxLogBytes {
lines = truncateLog(lines, len(rawLog))
}
var logLines []*binoculars.LogLine
var errs []error
for i := 0; i < len(lines); i++ {
line := lines[i]
if line == "" { // Can happen if we have a trailing newline
continue
}
logLine, err := splitLine(lines[i])
if err != nil {
errs = append(errs, err)
continue
}
logLines = append(logLines, logLine)
}
return logLines, errs
}
func truncateLog(lines []string, total int) []string {
lastExclIndex := len(lines)
for total > MaxLogBytes {
lastLine := lines[lastExclIndex-1]
total -= len(lastLine) + 1 // newline removed with strings.Split
lastExclIndex--
}
return lines[:lastExclIndex]
}
func splitLine(rawLine string) (*binoculars.LogLine, error) {
spaceIdx := strings.Index(rawLine, " ")
if spaceIdx == -1 {
return nil, fmt.Errorf("badly formatted log line: %q", rawLine)
}
timestamp := rawLine[:spaceIdx]
line := rawLine[spaceIdx+1:]
_, err := time.Parse(time.RFC3339Nano, timestamp)
if err != nil {
return nil, fmt.Errorf("failed parse timestamp in log line: %q: %v", rawLine, err)
}
return &binoculars.LogLine{Timestamp: timestamp, Line: line}, nil
}