-
Notifications
You must be signed in to change notification settings - Fork 11
/
fopsProvider.go
102 lines (89 loc) · 2.84 KB
/
fopsProvider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package flog
import (
"bytes"
"encoding/json"
"fmt"
"github.com/farseer-go/fs/configure"
"github.com/farseer-go/fs/core"
"github.com/farseer-go/fs/core/eumLogLevel"
"github.com/farseer-go/fs/dateTime"
"github.com/farseer-go/fs/parse"
"github.com/farseer-go/fs/sonyflake"
"github.com/farseer-go/fs/trace"
"net/http"
"time"
)
// FopsProvider 上传到FOPS
type FopsProvider struct {
}
func (r *FopsProvider) CreateLogger(categoryName string, formatter IFormatter, logLevel eumLogLevel.Enum) ILoggerPersistent {
persistent := &fopsLoggerPersistent{formatter: formatter, fopsServer: configure.GetFopsServer(), queue: make(chan *LogData, 10000)}
// 异步开启上传
go persistent.enableUpload()
return persistent
}
type fopsLoggerPersistent struct {
formatter IFormatter
fopsServer string // fops服务端
queue chan *LogData // 待上传的列表
}
func (r *fopsLoggerPersistent) IsEnabled(logLevel eumLogLevel.Enum) bool {
return true
}
func (r *fopsLoggerPersistent) Log(LogLevel eumLogLevel.Enum, log *LogData, exception error) {
if LogLevel != eumLogLevel.NoneLevel {
// 上传到FOPS时需要
if t := trace.CurTraceContext.Get(); t != nil {
log.TraceId = t.GetTraceId()
}
log.Content = mustCompile.ReplaceAllString(log.Content, "")
log.AppId = parse.ToString(core.AppId)
log.AppName = core.AppName
log.AppIp = core.AppIp
log.LogId = parse.ToString(sonyflake.GenerateId())
r.queue <- log
}
}
// 开启上传
func (r *fopsLoggerPersistent) enableUpload() {
for range time.NewTicker(3 * time.Second).C {
var lst []*LogData
// 当队列中有数据 且 取出的数量<1000时,则继续取出
for len(r.queue) > 0 && len(lst) < 1000 {
lst = append(lst, <-r.queue)
}
// 没有取到数据
if len(lst) == 0 {
continue
}
// 上传
if err := r.upload(lst); err != nil {
// 重新放回队列
for i := 0; i < len(lst); i++ {
r.queue <- lst[i]
}
// 不能使用flog.Error,如果此处执行了,会一直产生无用的错误信息
fmt.Println(r.formatter.Formatter(&LogData{CreateAt: dateTime.Now(), LogLevel: eumLogLevel.Warning, Component: "", Content: err.Error(), newLine: true}))
}
}
}
type UploadRequest struct {
List []*LogData
}
func (r *fopsLoggerPersistent) upload(lstLog []*LogData) error {
bodyByte, _ := json.Marshal(UploadRequest{List: lstLog})
url := r.fopsServer + "flog/upload"
newRequest, _ := http.NewRequest("POST", url, bytes.NewReader(bodyByte))
newRequest.Header.Set("Content-Type", "application/json")
// 链路追踪
client := &http.Client{}
rsp, err := client.Do(newRequest)
if err != nil {
return fmt.Errorf("上传日志到FOPS失败:%s", err.Error())
}
apiRsp := core.NewApiResponseByReader[any](rsp.Body)
if apiRsp.StatusCode != 200 {
return fmt.Errorf("上传日志到FOPS失败(%v):%s", rsp.StatusCode, apiRsp.StatusMessage)
}
return err
}