/
wrappedjob.go
71 lines (62 loc) · 1.58 KB
/
wrappedjob.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package ecron
import (
"context"
"fmt"
"runtime"
"time"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
"github.com/gotomicro/ego/core/elog"
"github.com/gotomicro/ego/core/emetric"
"github.com/gotomicro/ego/core/etrace"
)
type wrappedJob struct {
NamedJob
logger *elog.Component
}
// Run ...
func (wj wrappedJob) Run() {
wj.run()
}
func (wj wrappedJob) run() {
span, ctx := etrace.StartSpanFromContext(
context.Background(),
"ego-cron",
)
defer span.Finish()
traceID := etrace.ExtractTraceID(ctx)
emetric.JobHandleCounter.Inc("cron", wj.Name(), "begin")
var fields = []elog.Field{zap.String("name", wj.Name())}
// 如果设置了链路,增加链路信息
if opentracing.IsGlobalTracerRegistered() {
fields = append(fields, elog.FieldTid(traceID))
}
wj.logger.Info("cron start", fields...)
var beg = time.Now()
defer func() {
var err error
if rec := recover(); rec != nil {
switch rec := rec.(type) {
case error:
err = rec
default:
err = fmt.Errorf("%v", rec)
}
stack := make([]byte, 4096)
length := runtime.Stack(stack, true)
fields = append(fields, zap.ByteString("stack", stack[:length]))
}
if err != nil {
fields = append(fields, elog.FieldErr(err), elog.Duration("cost", time.Since(beg)))
wj.logger.Error("cron end", fields...)
} else {
wj.logger.Info("cron end", fields...)
}
emetric.JobHandleHistogram.Observe(time.Since(beg).Seconds(), "cron", wj.Name())
}()
err := wj.NamedJob.Run(ctx)
if err != nil {
fields = append(fields, elog.FieldErr(err))
wj.logger.Error("cron run failed", fields...)
}
}