Skip to content

Commit

Permalink
Logs PushRequest data.
Browse files Browse the repository at this point in the history
This will allows to find information about received size and total entries per tenant.

Example of a log from my dev testing:

```
level=debug ts=2021-01-15T11:16:21.735663076Z caller=http.go:67 org_id=3927 traceID=11c4774c6ec4bbf4 msg="push request parsed" path=/loki/api/v1/push content-type=application/x-protobuf body-size="11 kB" streams=5 entries=298 streamLabelsSize="1.9 kB" entriesSize="45 kB" totalSize="47 kB"
```

Of course this means we can use LogQL on this.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena committed Jan 15, 2021
1 parent 322e4bc commit 8f87f7b
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 5 deletions.
43 changes: 38 additions & 5 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"math"
"net/http"

"github.com/dustin/go-humanize"
"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -12,6 +14,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/unmarshal"
unmarshal_legacy "github.com/grafana/loki/pkg/logql/unmarshal/legacy"
lokiutil "github.com/grafana/loki/pkg/util"
)

var contentType = http.CanonicalHeaderKey("Content-Type")
Expand All @@ -20,7 +23,6 @@ const applicationJSON = "application/json"

// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {

req, err := ParseRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -44,22 +46,53 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
func ParseRequest(r *http.Request) (*logproto.PushRequest, error) {
var req logproto.PushRequest

switch r.Header.Get(contentType) {
logger := util.WithContext(r.Context(), util.Logger)
body := lokiutil.NewSizeReader(r.Body)
contentType := r.Header.Get(contentType)

defer func() {
var (
entriesSize int64
streamLabelsSize int64
totalEntries int64
)

for _, s := range req.Streams {
streamLabelsSize += int64(len(s.Labels))
for _, e := range s.Entries {
totalEntries++
entriesSize += int64(len(e.Line))
}
}
level.Debug(logger).Log(
"msg", "push request parsed",
"path", r.URL.Path,
"content-type", contentType,
"body-size", humanize.Bytes(uint64(body.Size())),
"streams", len(req.Streams),
"entries", totalEntries,
"streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)),
"entriesSize", humanize.Bytes(uint64(entriesSize)),
"totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)),
)
}()

switch contentType {
case applicationJSON:
var err error

if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
err = unmarshal.DecodePushRequest(r.Body, &req)
err = unmarshal.DecodePushRequest(body, &req)
} else {
err = unmarshal_legacy.DecodePushRequest(r.Body, &req)
err = unmarshal_legacy.DecodePushRequest(body, &req)
}

if err != nil {
return nil, err
}

default:
if err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {
if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {
return nil, err
}
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/util/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package util

import (
"io"
)

type sizeReader struct {
size int64
r io.Reader
}

type SizeReader interface {
io.Reader
Size() int64
}

// NewSizeReader returns an io.Reader that will have the number of bytes
// read from r available.
func NewSizeReader(r io.Reader) SizeReader {
return &sizeReader{r: r}
}

func (v *sizeReader) Read(p []byte) (int, error) {
n, err := v.r.Read(p)
v.size += int64(n)
return n, err
}

func (v *sizeReader) Size() int64 {
return v.size
}

0 comments on commit 8f87f7b

Please sign in to comment.