-
Notifications
You must be signed in to change notification settings - Fork 0
/
metric.go
120 lines (99 loc) · 2.34 KB
/
metric.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package metric
import (
"encoding/json"
"sync/atomic"
"time"
"unsafe"
"github.com/bigstack-oss/plane-go/pkg/base/log"
"github.com/bigstack-oss/plane-go/pkg/base/monitoring"
"github.com/bigstack-oss/plane-go/pkg/frame/oneway/plugin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/zap"
)
const (
module = "metric"
oneMinute = 60 * time.Second
)
var (
inputOK = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "input_ok",
Help: "",
},
)
inputErr = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "input_err",
Help: "",
},
)
transitOK = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "transit_ok",
Help: "",
},
)
transitErr = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "transit_err",
Help: "",
},
)
processOK = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "process_ok",
Help: "",
},
)
processErr = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "process_err",
Help: "",
},
)
outputOK = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "output_ok",
Help: "",
},
)
outputErr = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "output_err",
Help: "",
},
)
metricLogger = log.GetLogger(module)
)
func RegisterGaugeMetric(gauge *prometheus.GaugeVec) {
monitoring.MetricRegistry.MustRegister(gauge)
}
func showMetrics(metrics *plugin.Metric) {
jsonMetrics, err := json.Marshal(metrics)
if err != nil {
metricLogger.Error("failed to dump metrics periodically")
} else {
rawMessage := json.RawMessage(jsonMetrics)
metricLogger.Info("periodical metrics dump", zap.Any("metrics", &rawMessage))
}
}
func swapMetrics(metrics **plugin.Metric) *plugin.Metric {
oldMetrics := &plugin.Metric{}
newMetrics := &plugin.Metric{}
*(*unsafe.Pointer)(unsafe.Pointer(&oldMetrics)) = atomic.SwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(metrics)),
*(*unsafe.Pointer)(unsafe.Pointer(&newMetrics)),
)
return oldMetrics
}
func setMetrics(metrics *plugin.Metric) {
inputOK.Set(float64(metrics.InputOK))
inputErr.Set(float64(metrics.InputErr))
transitOK.Set(float64(metrics.TransitOK))
transitErr.Set(float64(metrics.TransitErr))
processOK.Set(float64(metrics.ProcessOK))
processErr.Set(float64(metrics.ProcessErr))
outputOK.Set(float64(metrics.OutputOK))
outputErr.Set(float64(metrics.OutputErr))
}