-
Notifications
You must be signed in to change notification settings - Fork 348
/
logger.go
93 lines (80 loc) · 2.46 KB
/
logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package tasklogger
import (
"strings"
"time"
"github.com/determined-ai/determined/master/pkg/model"
"github.com/determined-ai/determined/master/pkg/ptrs"
)
var (
// FlushInterval is the longest time that the logger will buffer logs in memory before
// flushing them to the database. This is set low to ensure a good user experience.
FlushInterval = 20 * time.Millisecond
// BufferSize is the largest number of logs lines that can be buffered before flushing them to
// the database. For the strategy of many-rows-per-insert, performance was significantly worse
// below 500, and no improvements after 1000.
BufferSize = 1000
)
// Logger is an abstraction for inserting master-side inserted task logs, such as
// scheduling and provisioning information, or container exit statuses.
// TODO(DET-9537): Add graceful shutdown for the tasklogger, so that when we
// intentionally blip the master for something like a configuration update
// we do not lose logs.
type Logger struct {
backend Writer
inbox chan *model.TaskLog
}
// New creates an logger which can buffer up task logs and flush them periodically.
// There should only be one logger shared across the entire system.
func New(backend Writer) *Logger {
l := Logger{
backend: backend,
inbox: make(chan *model.TaskLog, BufferSize),
}
go l.run()
return &l
}
// Insert a log into the buffer to be flush within some interval.
func (l *Logger) Insert(tl *model.TaskLog) {
l.inbox <- tl
}
func (l *Logger) run() {
pending := make([]*model.TaskLog, 0, BufferSize)
defer l.flush(pending)
t := time.NewTicker(FlushInterval)
defer t.Stop()
for {
var flush bool
select {
case <-t.C:
flush = len(pending) > 0
case tl := <-l.inbox:
pending = append(pending, tl)
flush = len(pending) >= BufferSize
}
if !flush {
continue
}
l.flush(pending)
pending = make([]*model.TaskLog, 0, BufferSize)
}
}
func (l *Logger) flush(pending []*model.TaskLog) {
err := l.backend.AddTaskLogs(pending)
if err != nil {
syslog.WithError(err).Errorf("failed to save task logs")
}
}
// CreateLogFromMaster creates a tasklog of the format that we expect when it comes from master.
func CreateLogFromMaster(taskID model.TaskID, level, log string) *model.TaskLog {
if !strings.HasSuffix(log, "\n") {
log += "\n"
}
return &model.TaskLog{
TaskID: string(taskID),
Timestamp: ptrs.Ptr(time.Now().UTC()),
Level: &level,
Source: ptrs.Ptr("master"),
StdType: ptrs.Ptr("stdout"),
Log: log,
}
}