Skip to content

Commit

Permalink
Move to new k6 module API
Browse files Browse the repository at this point in the history
see grafana/k6#2344 for more info
  • Loading branch information
mstoykov committed Feb 10, 2022
1 parent 217a587 commit b594804
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 73 deletions.
6 changes: 3 additions & 3 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ func (b *Batch) createPushRequest() (*logproto.PushRequest, int) {
}

// newBatch creates a batch with randomly generated log streams
func newBatch(ctx context.Context, pool LabelPool, numStreams, minBatchSize, maxBatchSize int) *Batch {
func newBatch(
ctx context.Context, state *lib.State, pool LabelPool, numStreams, minBatchSize, maxBatchSize int,
) *Batch {
batch := &Batch{
Streams: make(map[string]*logproto.Stream, numStreams),
CreatedAt: time.Now(),
}
state := lib.GetState(ctx)

hostname, err := os.Hostname()
if err != nil {
hostname = "localhost"
Expand Down
6 changes: 2 additions & 4 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func BenchmarkNewBatch(b *testing.B) {
VUID: 15,
}
ctx, cancel := context.WithCancel(context.Background())
ctx = lib.WithState(ctx, state)

defer cancel()
defer close(samples)
Expand All @@ -36,7 +35,7 @@ func BenchmarkNewBatch(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_ = newBatch(ctx, labels, streams, minBatchSize, maxBatchSize)
_ = newBatch(ctx, state, labels, streams, minBatchSize, maxBatchSize)
}
}

Expand All @@ -47,7 +46,6 @@ func BenchmarkEncode(b *testing.B) {
VUID: 15,
}
ctx, cancel := context.WithCancel(context.Background())
ctx = lib.WithState(ctx, state)

defer cancel()
defer close(samples)
Expand All @@ -65,7 +63,7 @@ func BenchmarkEncode(b *testing.B) {
labels := newLabelPool(faker, cardinalities)

b.ReportAllocs()
batch := newBatch(ctx, labels, streams, minBatchSize, maxBatchSize)
batch := newBatch(ctx, state, labels, streams, minBatchSize, maxBatchSize)

b.Run("encode protobuf", func(b *testing.B) {
b.ResetTimer()
Expand Down
69 changes: 36 additions & 33 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package loki

import (
"bytes"
"context"
"encoding/json"
"fmt"
"math/rand"
Expand All @@ -13,7 +12,7 @@ import (

"github.com/grafana/loki/pkg/logql/stats"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/netext/httpext"
k6_stats "go.k6.io/k6/stats"
Expand All @@ -37,8 +36,8 @@ var (

type Client struct {
client *http.Client
logger logrus.FieldLogger
cfg *Config
vu modules.VU
}

type Config struct {
Expand All @@ -50,21 +49,21 @@ type Config struct {
ProtobufRatio float64
}

func (c *Client) InstantQuery(ctx context.Context, logQuery string, limit int) (httpext.Response, error) {
func (c *Client) InstantQuery(logQuery string, limit int) (httpext.Response, error) {
q := &Query{
Type: InstantQuery,
QueryString: logQuery,
Limit: limit,
}
q.SetInstant(time.Now())
response, err := c.sendQuery(ctx, q)
response, err := c.sendQuery(q)
if err == nil && IsSuccessfulResponse(response.Status) {
err = reportMetricsFromStats(ctx, response, InstantQuery)
err = c.reportMetricsFromStats(response, InstantQuery)
}
return response, err
}

func (c *Client) RangeQuery(ctx context.Context, logQuery string, duration string, limit int) (httpext.Response, error) {
func (c *Client) RangeQuery(logQuery string, duration string, limit int) (httpext.Response, error) {
now := time.Now()
dur, err := time.ParseDuration(duration)
if err != nil {
Expand All @@ -77,14 +76,14 @@ func (c *Client) RangeQuery(ctx context.Context, logQuery string, duration strin
End: now,
Limit: limit,
}
response, err := c.sendQuery(ctx, q)
response, err := c.sendQuery(q)
if err == nil && IsSuccessfulResponse(response.Status) {
err = reportMetricsFromStats(ctx, response, RangeQuery)
err = c.reportMetricsFromStats(response, RangeQuery)
}
return response, err
}

func (c *Client) LabelsQuery(ctx context.Context, duration string) (httpext.Response, error) {
func (c *Client) LabelsQuery(duration string) (httpext.Response, error) {
now := time.Now()
dur, err := time.ParseDuration(duration)
if err != nil {
Expand All @@ -95,10 +94,10 @@ func (c *Client) LabelsQuery(ctx context.Context, duration string) (httpext.Resp
Start: now.Add(-dur),
End: now,
}
return c.sendQuery(ctx, q)
return c.sendQuery(q)
}

func (c *Client) LabelValuesQuery(ctx context.Context, label string, duration string) (httpext.Response, error) {
func (c *Client) LabelValuesQuery(label string, duration string) (httpext.Response, error) {
now := time.Now()
dur, err := time.ParseDuration(duration)
if err != nil {
Expand All @@ -110,10 +109,10 @@ func (c *Client) LabelValuesQuery(ctx context.Context, label string, duration st
End: now,
PathParams: []interface{}{label},
}
return c.sendQuery(ctx, q)
return c.sendQuery(q)
}

func (c *Client) SeriesQuery(ctx context.Context, matchers string, duration string) (httpext.Response, error) {
func (c *Client) SeriesQuery(matchers string, duration string) (httpext.Response, error) {
now := time.Now()
dur, err := time.ParseDuration(duration)
if err != nil {
Expand All @@ -125,7 +124,7 @@ func (c *Client) SeriesQuery(ctx context.Context, matchers string, duration stri
Start: now.Add(-dur),
End: now,
}
return c.sendQuery(ctx, q)
return c.sendQuery(q)
}

// buildURL concatinates a URL `http://foo/bar` with a path `/buzz` and a query string `?query=...`.
Expand All @@ -139,8 +138,8 @@ func buildURL(u, p, qs string) (string, error) {
return url.String(), nil
}

func (c *Client) sendQuery(ctx context.Context, q *Query) (httpext.Response, error) {
state := lib.GetState(ctx)
func (c *Client) sendQuery(q *Query) (httpext.Response, error) {
state := c.vu.State()
if state == nil {
return *httpext.NewResponse(), errors.New("state is nil")
}
Expand All @@ -167,7 +166,7 @@ func (c *Client) sendQuery(ctx context.Context, q *Query) (httpext.Response, err
}

url, _ := httpext.NewURL(urlString, path)
response, err := httpext.MakeRequest(ctx, state, &httpext.ParsedHTTPRequest{
response, err := httpext.MakeRequest(c.vu.Context(), state, &httpext.ParsedHTTPRequest{
URL: &url,
Req: r,
Throw: state.Options.Throw.Bool,
Expand All @@ -182,25 +181,29 @@ func (c *Client) sendQuery(ctx context.Context, q *Query) (httpext.Response, err
return *response, err
}

func (c *Client) Push(ctx context.Context) (httpext.Response, error) {
func (c *Client) Push() (httpext.Response, error) {
// 5 streams per batch
// batch size between 800KB and 1MB
return c.PushParameterized(ctx, 5, 800*1024, 1024*1024)
return c.PushParameterized(5, 800*1024, 1024*1024)
}

// PushParametrized is deprecated in favor or PushParameterized
func (c *Client) PushParametrized(ctx context.Context, streams, minBatchSize, maxBatchSize int) (httpext.Response, error) {
c.logger.Warn("method pushParametrized() is deprecated and will be removed in future releases; please use pushParameterized() instead")
return c.PushParameterized(ctx, streams, minBatchSize, maxBatchSize)
func (c *Client) PushParametrized(streams, minBatchSize, maxBatchSize int) (httpext.Response, error) {
if state := c.vu.State(); state == nil {
return *httpext.NewResponse(), errors.New("state is nil")
} else {
state.Logger.Warn("method pushParametrized() is deprecated and will be removed in future releases; please use pushParameterized() instead")
}
return c.PushParameterized(streams, minBatchSize, maxBatchSize)
}

func (c *Client) PushParameterized(ctx context.Context, streams, minBatchSize, maxBatchSize int) (httpext.Response, error) {
batch := newBatch(ctx, c.cfg.Labels, streams, minBatchSize, maxBatchSize)
return c.pushBatch(ctx, batch)
func (c *Client) PushParameterized(streams, minBatchSize, maxBatchSize int) (httpext.Response, error) {
batch := newBatch(c.vu.Context(), c.vu.State(), c.cfg.Labels, streams, minBatchSize, maxBatchSize)
return c.pushBatch(batch)
}

func (c *Client) pushBatch(ctx context.Context, batch *Batch) (httpext.Response, error) {
state := lib.GetState(ctx)
func (c *Client) pushBatch(batch *Batch) (httpext.Response, error) {
state := c.vu.State()
if state == nil {
return *httpext.NewResponse(), errors.New("state is nil")
}
Expand All @@ -220,7 +223,7 @@ func (c *Client) pushBatch(ctx context.Context, batch *Batch) (httpext.Response,
return *httpext.NewResponse(), errors.Wrap(err, "failed to encode payload")
}

res, err := c.send(ctx, state, buf, encodeSnappy)
res, err := c.send(state, buf, encodeSnappy)
if err != nil {
return *httpext.NewResponse(), errors.Wrap(err, "push request failed")
}
Expand All @@ -229,7 +232,7 @@ func (c *Client) pushBatch(ctx context.Context, batch *Batch) (httpext.Response,
return res, nil
}

func (c *Client) send(ctx context.Context, state *lib.State, buf []byte, useProtobuf bool) (httpext.Response, error) {
func (c *Client) send(state *lib.State, buf []byte, useProtobuf bool) (httpext.Response, error) {
httpResp := httpext.NewResponse()
path := "/loki/api/v1/push"
r, err := http.NewRequest(http.MethodPost, c.cfg.URL.String()+path, nil)
Expand All @@ -252,7 +255,7 @@ func (c *Client) send(ctx context.Context, state *lib.State, buf []byte, useProt
}

url, _ := httpext.NewURL(c.cfg.URL.String()+path, path)
response, err := httpext.MakeRequest(ctx, state, &httpext.ParsedHTTPRequest{
response, err := httpext.MakeRequest(c.vu.Context(), state, &httpext.ParsedHTTPRequest{
URL: &url,
Req: r,
Body: bytes.NewBuffer(buf),
Expand All @@ -279,7 +282,7 @@ type responseWithStats struct {
}
}

func reportMetricsFromStats(ctx context.Context, response httpext.Response, queryType QueryType) error {
func (c *Client) reportMetricsFromStats(response httpext.Response, queryType QueryType) error {
responseBody, ok := response.Body.(string)
if !ok {
return errors.New("response body is not a string")
Expand All @@ -291,7 +294,7 @@ func reportMetricsFromStats(ctx context.Context, response httpext.Response, quer
}
now := time.Now()
tags := k6_stats.NewSampleTags(map[string]string{"endpoint": queryType.Endpoint()})
k6_stats.PushIfNotDone(ctx, lib.GetState(ctx).Samples, k6_stats.ConnectedSamples{
k6_stats.PushIfNotDone(c.vu.Context(), c.vu.State().Samples, k6_stats.ConnectedSamples{
Samples: []k6_stats.Sample{
{
Metric: BytesProcessedTotal,
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ go 1.16

require (
github.com/brianvoe/gofakeit/v6 v6.9.0
github.com/go-kit/log v0.2.0
github.com/dop251/goja v0.0.0-20220110113543-261677941f3c
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/gogo/protobuf v1.3.1
github.com/golang/snappy v0.0.3
github.com/grafana/loki v1.6.1
github.com/json-iterator/go v1.1.10
github.com/mingrammer/flog v0.4.3
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.10.0
github.com/sirupsen/logrus v1.8.1
go.k6.io/k6 v0.36.0
)

Expand Down
Loading

0 comments on commit b594804

Please sign in to comment.