/
job_logger.go
81 lines (64 loc) · 1.39 KB
/
job_logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package xjm
import (
"fmt"
"os"
"time"
"github.com/askasoft/pango/log"
)
// JobLogWriter implements log Writer Interface and writes messages to terminal.
type JobLogWriter struct {
log.LogFilter
log.BatchWriter
jmr JobManager
jid int64
}
func NewJobLogWriter(jmr JobManager, jid int64) *JobLogWriter {
jlw := &JobLogWriter{jmr: jmr, jid: jid}
jlw.Filter = log.NewLevelFilter(log.LevelDebug)
jlw.BatchCount = 100
jlw.CacheCount = 200
jlw.FlushLevel = log.LevelWarn
jlw.FlushDelta = time.Second
return jlw
}
// Write write message in console.
func (jlw *JobLogWriter) Write(le *log.Event) (err error) {
if jlw.Reject(le) {
return
}
jlw.InitBuffer()
jlw.EventBuffer.Push(le)
if jlw.ShouldFlush(le) {
jlw.Flush()
}
return nil
}
// Flush implementing method. empty.
func (jlw *JobLogWriter) Flush() {
if jlw.EventBuffer == nil || jlw.EventBuffer.IsEmpty() {
return
}
if err := jlw.flush(); err == nil {
jlw.EventBuffer.Clear()
} else {
fmt.Fprintln(os.Stderr, err.Error())
}
}
func (jlw *JobLogWriter) flush() error {
var jls []*JobLog
for it := jlw.EventBuffer.Iterator(); it.Next(); {
le := it.Value()
jl := &JobLog{
JID: jlw.jid,
Time: le.Time,
Level: le.Level.Prefix(),
Message: le.Msg,
}
jls = append(jls, jl)
}
return jlw.jmr.AddJobLogs(jls)
}
// Close implementing method. empty.
func (jlw *JobLogWriter) Close() {
jlw.Flush()
}