forked from pachyderm/pachyderm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
metrics.go
126 lines (115 loc) · 3.81 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package metrics
import (
"fmt"
"time"
"github.com/pachyderm/pachyderm/src/client/pkg/config"
"github.com/pachyderm/pachyderm/src/client/pkg/uuid"
"github.com/pachyderm/pachyderm/src/client/version"
log "github.com/Sirupsen/logrus"
"github.com/segmentio/analytics-go"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
kube "k8s.io/kubernetes/pkg/client/unversioned"
)
//Reporter is used to submit user & cluster metrics to segment
type Reporter struct {
segmentClient *analytics.Client
clusterID string
kubeClient *kube.Client
}
// NewReporter creates a new reporter and kicks off the loop to report cluster
// metrics
func NewReporter(clusterID string, kubeClient *kube.Client) *Reporter {
reporter := &Reporter{
segmentClient: newPersistentClient(),
clusterID: clusterID,
kubeClient: kubeClient,
}
go reporter.reportClusterMetrics()
return reporter
}
//ReportUserAction pushes the action into a queue for reporting,
// and reports the start, finish, and error conditions
func ReportUserAction(ctx context.Context, r *Reporter, action string) func(time.Time, error) {
if r == nil {
// This happens when stubbing out metrics for testing, e.g. src/server/pfs/server/server_test.go
return func(time.Time, error) {}
}
// If we report nil, segment sees it, but mixpanel omits the field
r.reportUserAction(ctx, fmt.Sprintf("%vStarted", action), 1)
return func(start time.Time, err error) {
if err == nil {
r.reportUserAction(ctx, fmt.Sprintf("%vFinished", action), time.Since(start).Seconds())
} else {
r.reportUserAction(ctx, fmt.Sprintf("%vErrored", action), err.Error())
}
}
}
func getKeyFromMD(md metadata.MD, key string) (string, error) {
if md[key] != nil && len(md[key]) > 0 {
return md[key][0], nil
}
return "", fmt.Errorf("error extracting userid from metadata. userid is empty")
}
func (r *Reporter) reportUserAction(ctx context.Context, action string, value interface{}) {
md, ok := metadata.FromContext(ctx)
if ok {
// metadata API downcases all the key names
userID, err := getKeyFromMD(md, "userid")
if err != nil {
// The FUSE client will never have a userID, so normal usage will produce a lot of these errors
return
}
prefix, err := getKeyFromMD(md, "prefix")
if err != nil {
log.Errorln(err)
return
}
reportUserMetricsToSegment(
r.segmentClient,
userID,
prefix,
action,
value,
r.clusterID,
)
} else {
log.Errorf("Error extracting userid metadata from context: %v\n", ctx)
}
}
// ReportAndFlushUserAction immediately reports the metric
// It is used in the few places we need to report metrics from the client.
// It handles reporting the start, finish, and error conditions of the action
func ReportAndFlushUserAction(action string) func(time.Time, error) {
// If we report nil, segment sees it, but mixpanel omits the field
reportAndFlushUserAction(fmt.Sprintf("%vStarted", action), 1)
return func(start time.Time, err error) {
if err == nil {
reportAndFlushUserAction(fmt.Sprintf("%vFinished", action), time.Since(start).Seconds())
} else {
reportAndFlushUserAction(fmt.Sprintf("%vErrored", action), err.Error())
}
}
}
func reportAndFlushUserAction(action string, value interface{}) {
client := newSegmentClient()
defer client.Close()
cfg, err := config.Read()
if err != nil {
log.Errorf("Error reading userid from ~/.pachyderm/config: %v\n", err)
// metrics errors are non fatal
return
}
reportUserMetricsToSegment(client, cfg.UserID, "user", action, value, "")
}
func (r *Reporter) reportClusterMetrics() {
for {
time.Sleep(reportingInterval)
metrics := &Metrics{}
externalMetrics(r.kubeClient, metrics)
metrics.ClusterID = r.clusterID
metrics.PodID = uuid.NewWithoutDashes()
metrics.Version = version.PrettyPrintVersion(version.Version)
reportClusterMetricsToSegment(r.segmentClient, metrics)
}
}