Skip to content

Commit

Permalink
Refactor request metadata output to a separate package (#3201)
Browse files Browse the repository at this point in the history
  • Loading branch information
Blinkuu committed Jul 19, 2023
1 parent 33e7406 commit ac9c6bc
Show file tree
Hide file tree
Showing 12 changed files with 490 additions and 427 deletions.
103 changes: 0 additions & 103 deletions output/cloud/expv2/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,12 @@ package expv2

import (
"errors"
"strconv"
"sync"
"time"

"go.k6.io/k6/cloudapi/insights"
"go.k6.io/k6/lib/netext/httpext"
"go.k6.io/k6/metrics"
)

const (
metadataTraceIDKey = "trace_id"
scenarioTag = "scenario"
groupTag = "group"
nameTag = "name"
methodTag = "method"
statusTag = "status"
)

type timeBucket struct {
Time int64
Sinks map[metrics.TimeSeries]metricValue
Expand Down Expand Up @@ -175,94 +163,3 @@ func (c *collector) timeFromBucketID(id int64) int64 {
func (c *collector) bucketCutoffID() int64 {
return c.nowFunc().Add(-c.waitPeriod).UnixNano() / int64(c.aggregationPeriod)
}

type rmCollector struct {
testRunID int64
buffer insights.RequestMetadatas
bufferMu *sync.Mutex
}

func newRequestMetadatasCollector(testRunID int64) *rmCollector {
return &rmCollector{
testRunID: testRunID,
buffer: nil,
bufferMu: &sync.Mutex{},
}
}

func (c *rmCollector) CollectRequestMetadatas(sampleContainers []metrics.SampleContainer) {
if len(sampleContainers) < 1 {
return
}

// TODO(lukasz, other-proto-support): Support grpc/websocket trails.
var newBuffer insights.RequestMetadatas
for _, sampleContainer := range sampleContainers {
trail, ok := sampleContainer.(*httpext.Trail)
if !ok {
continue
}

traceID, found := trail.Metadata[metadataTraceIDKey]
if !found {
continue
}

m := insights.RequestMetadata{
TraceID: traceID,
Start: trail.EndTime.Add(-trail.Duration),
End: trail.EndTime,
TestRunLabels: insights.TestRunLabels{
ID: c.testRunID,
Scenario: c.getStringTagFromTrail(trail, scenarioTag),
Group: c.getStringTagFromTrail(trail, groupTag),
},
ProtocolLabels: insights.ProtocolHTTPLabels{
URL: c.getStringTagFromTrail(trail, nameTag),
Method: c.getStringTagFromTrail(trail, methodTag),
StatusCode: c.getIntTagFromTrail(trail, statusTag),
},
}

newBuffer = append(newBuffer, m)
}

if len(newBuffer) < 1 {
return
}

c.bufferMu.Lock()
defer c.bufferMu.Unlock()

c.buffer = append(c.buffer, newBuffer...)
}

func (c *rmCollector) PopAll() insights.RequestMetadatas {
c.bufferMu.Lock()
defer c.bufferMu.Unlock()

b := c.buffer
c.buffer = nil
return b
}

func (c *rmCollector) getStringTagFromTrail(trail *httpext.Trail, key string) string {
if tag, found := trail.Tags.Get(key); found {
return tag
}

return ""
}

func (c *rmCollector) getIntTagFromTrail(trail *httpext.Trail, key string) int64 {
if tag, found := trail.Tags.Get(key); found {
tagInt, err := strconv.ParseInt(tag, 10, 64)
if err != nil {
return 0
}

return tagInt
}

return 0
}
138 changes: 0 additions & 138 deletions output/cloud/expv2/collect_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package expv2

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.k6.io/k6/cloudapi/insights"
"go.k6.io/k6/lib/netext"
"go.k6.io/k6/lib/netext/httpext"
"go.k6.io/k6/metrics"
)

Expand Down Expand Up @@ -312,137 +308,3 @@ func TestBucketQPushPopConcurrency(t *testing.T) {
}
}
}

func Test_requestMetadatasCollector_CollectRequestMetadatas_DoesNothingWithEmptyData(t *testing.T) {
t.Parallel()

// Given
testRunID := int64(1337)
col := newRequestMetadatasCollector(testRunID)
var data []metrics.SampleContainer

// When
col.CollectRequestMetadatas(data)

// Then
require.Empty(t, col.buffer)
}

func Test_requestMetadatasCollector_CollectRequestMetadatas_FiltersAndStoresHTTPTrailsAsRequestMetadatas(t *testing.T) {
t.Parallel()

// Given
testRunID := int64(1337)
col := newRequestMetadatasCollector(testRunID)
data := []metrics.SampleContainer{
&httpext.Trail{
EndTime: time.Unix(10, 0),
Duration: time.Second,
Tags: metrics.NewRegistry().RootTagSet().
With(scenarioTag, "test-scenario-1").
With(groupTag, "test-group-1").
With(nameTag, "test-url-1").
With(methodTag, "test-method-1").
With(statusTag, "200"),
Metadata: map[string]string{
metadataTraceIDKey: "test-trace-id-1",
},
},
&httpext.Trail{
// HTTP trail without trace ID should be ignored
},
&netext.NetTrail{
// Net trail should be ignored
},
&httpext.Trail{
EndTime: time.Unix(20, 0),
Duration: time.Second,
Tags: metrics.NewRegistry().RootTagSet().
With(scenarioTag, "test-scenario-2").
With(groupTag, "test-group-2").
With(nameTag, "test-url-2").
With(methodTag, "test-method-2").
With(statusTag, "401"),
Metadata: map[string]string{
metadataTraceIDKey: "test-trace-id-2",
},
},
&httpext.Trail{
EndTime: time.Unix(20, 0),
Duration: time.Second,
Tags: metrics.NewRegistry().RootTagSet(),
// HTTP trail without `trace_id` metadata key should be ignored
Metadata: map[string]string{},
},
&httpext.Trail{
EndTime: time.Unix(20, 0),
Duration: time.Second,
// If no tags are present, output should be set to `unknown`
Tags: metrics.NewRegistry().RootTagSet(),
Metadata: map[string]string{
metadataTraceIDKey: "test-trace-id-3",
},
},
}

// When
col.CollectRequestMetadatas(data)

// Then
require.Len(t, col.buffer, 3)
require.Contains(t, col.buffer, insights.RequestMetadata{
TraceID: "test-trace-id-1",
Start: time.Unix(9, 0),
End: time.Unix(10, 0),
TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "test-scenario-1", Group: "test-group-1"},
ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-1", Method: "test-method-1", StatusCode: 200},
})
require.Contains(t, col.buffer, insights.RequestMetadata{
TraceID: "test-trace-id-2",
Start: time.Unix(19, 0),
End: time.Unix(20, 0),
TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "test-scenario-2", Group: "test-group-2"},
ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-2", Method: "test-method-2", StatusCode: 401},
})
require.Contains(t, col.buffer, insights.RequestMetadata{
TraceID: "test-trace-id-3",
Start: time.Unix(19, 0),
End: time.Unix(20, 0),
TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "", Group: ""},
ProtocolLabels: insights.ProtocolHTTPLabels{URL: "", Method: "", StatusCode: 0},
})
}

func Test_requestMetadatasCollector_PopAll_DoesNothingWithEmptyData(t *testing.T) {
t.Parallel()

// Given
data := insights.RequestMetadatas{
{
TraceID: "test-trace-id-1",
Start: time.Unix(9, 0),
End: time.Unix(10, 0),
TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "test-scenario-1", Group: "test-group-1"},
ProtocolLabels: insights.ProtocolHTTPLabels{URL: "test-url-1", Method: "test-method-1", StatusCode: 200},
},
{
TraceID: "test-trace-id-2",
Start: time.Unix(19, 0),
End: time.Unix(20, 0),
TestRunLabels: insights.TestRunLabels{ID: 1337, Scenario: "unknown", Group: "unknown"},
ProtocolLabels: insights.ProtocolHTTPLabels{URL: "unknown", Method: "unknown", StatusCode: 0},
},
}
col := &rmCollector{
buffer: data,
bufferMu: &sync.Mutex{},
}

// When
got := col.PopAll()

// Then
require.Nil(t, col.buffer)
require.Empty(t, col.buffer)
require.Equal(t, data, got)
}
30 changes: 1 addition & 29 deletions output/cloud/expv2/flush.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package expv2

import (
"context"
"time"

"github.com/sirupsen/logrus"
"go.k6.io/k6/cloudapi/insights"

"go.k6.io/k6/metrics"
"go.k6.io/k6/output/cloud/expv2/pbcloud"
)
Expand Down Expand Up @@ -189,30 +188,3 @@ func (msb *metricSetBuilder) recordDiscardedLabels(labels []string) {
msb.discardedLabels[key] = struct{}{}
}
}

// insightsClient is an interface for sending request metadatas to the Insights API.
type insightsClient interface {
IngestRequestMetadatasBatch(context.Context, insights.RequestMetadatas) error
Close() error
}

type requestMetadatasFlusher struct {
client insightsClient
collector requestMetadatasCollector
}

func newTracesFlusher(client insightsClient, collector requestMetadatasCollector) *requestMetadatasFlusher {
return &requestMetadatasFlusher{
client: client,
collector: collector,
}
}

func (f *requestMetadatasFlusher) flush() error {
requestMetadatas := f.collector.PopAll()
if len(requestMetadatas) < 1 {
return nil
}

return f.client.IngestRequestMetadatasBatch(context.Background(), requestMetadatas)
}
Loading

0 comments on commit ac9c6bc

Please sign in to comment.