-
Notifications
You must be signed in to change notification settings - Fork 24
/
loki.go
42 lines (35 loc) · 1.09 KB
/
loki.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package loki
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
logproto "github.com/grafana/loki/pkg/push"
"github.com/grafana/synthetic-monitoring-agent/internal/pkg/prom"
)
// sendSamples to the remote storage with backoff for recoverable errors.
func SendStreamsWithBackoff(ctx context.Context, client *prom.Client, streams []logproto.Stream, buf *[]byte) error {
req, err := buildStreamsPushRequest(streams, *buf)
*buf = req
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
return err
}
return prom.SendBytesWithBackoff(ctx, client, req)
}
func buildStreamsPushRequest(streams []logproto.Stream, buf []byte) ([]byte, error) {
req := &logproto.PushRequest{
Streams: streams,
}
data, err := proto.Marshal(req)
if err != nil {
return nil, err
}
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
buf = buf[0:cap(buf)]
}
compressed := snappy.Encode(buf, data)
return compressed, nil
}