/
metrics_client.go
116 lines (98 loc) · 2.73 KB
/
metrics_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package cloud
import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
easyjson "github.com/mailru/easyjson"
"github.com/sirupsen/logrus"
"go.k6.io/k6/cloudapi"
)
// MetricsClient is a wrapper around the cloudapi.Client that is also capable of pushing
type MetricsClient struct {
*cloudapi.Client
logger logrus.FieldLogger
host string
noCompress bool
pushBufferPool sync.Pool
}
// NewMetricsClient creates and initializes a new MetricsClient.
func NewMetricsClient(client *cloudapi.Client, logger logrus.FieldLogger, host string, noCompress bool) *MetricsClient {
return &MetricsClient{
Client: client,
logger: logger,
host: host,
noCompress: noCompress,
pushBufferPool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}
}
// PushMetric pushes the provided metric samples for the given referenceID
func (mc *MetricsClient) PushMetric(referenceID string, s []*Sample) error {
start := time.Now()
url := fmt.Sprintf("%s/v1/metrics/%s", mc.host, referenceID)
jsonStart := time.Now()
b, err := easyjson.Marshal(samples(s))
if err != nil {
return err
}
jsonTime := time.Since(jsonStart)
// TODO: change the context, maybe to one with a timeout
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, url, nil)
if err != nil {
return err
}
req.Header.Set("X-Payload-Sample-Count", strconv.Itoa(len(s)))
var additionalFields logrus.Fields
if !mc.noCompress {
buf, ok := mc.pushBufferPool.Get().(*bytes.Buffer)
if !ok {
return errors.New("failed to convert a buffer pool item " +
"into the expected type bytes Buffer for gzip compression operation")
}
buf.Reset()
defer mc.pushBufferPool.Put(buf)
unzippedSize := len(b)
buf.Grow(unzippedSize / expectedGzipRatio)
gzipStart := time.Now()
{
g, _ := gzip.NewWriterLevel(buf, gzip.BestSpeed)
if _, err = g.Write(b); err != nil {
return err
}
if err = g.Close(); err != nil {
return err
}
}
gzipTime := time.Since(gzipStart)
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("X-Payload-Byte-Count", strconv.Itoa(unzippedSize))
additionalFields = logrus.Fields{
"unzipped_size": unzippedSize,
"gzip_t": gzipTime,
"content_length": buf.Len(),
}
b = buf.Bytes()
}
req.Header.Set("Content-Length", strconv.Itoa(len(b)))
req.Body = io.NopCloser(bytes.NewReader(b))
req.GetBody = func() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(b)), nil
}
err = mc.Client.Do(req, nil)
mc.logger.WithFields(logrus.Fields{
"t": time.Since(start),
"json_t": jsonTime,
"part_size": len(s),
}).WithFields(additionalFields).Debug("Pushed part to cloud")
return err
}