Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

output/cloudv2: Use a static remote service url #3125

Merged
merged 1 commit into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions output/cloud/expv2/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type pusher interface {
push(referenceID string, samples *pbcloud.MetricSet) error
push(samples *pbcloud.MetricSet) error
}

type metricsFlusher struct {
Expand Down Expand Up @@ -46,7 +46,7 @@ func (f *metricsFlusher) flush(_ context.Context) error {
}

// we hit the chunk size, let's flush
err := f.client.push(f.referenceID, msb.MetricSet)
err := f.client.push(msb.MetricSet)
if err != nil {
return err
}
Expand All @@ -58,7 +58,7 @@ func (f *metricsFlusher) flush(_ context.Context) error {
}

// send the last (or the unique) MetricSet chunk to the remote service
return f.client.push(f.referenceID, msb.MetricSet)
return f.client.push(msb.MetricSet)
}

type metricSetBuilder struct {
Expand Down
2 changes: 1 addition & 1 deletion output/cloud/expv2/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type pusherMock struct {
pushCalled int
}

func (pm *pusherMock) push(_ string, _ *pbcloud.MetricSet) error {
func (pm *pusherMock) push(_ *pbcloud.MetricSet) error {
pm.pushCalled++
return nil
}
29 changes: 14 additions & 15 deletions output/cloud/expv2/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,40 @@ import (
// to the remote service.
type metricsClient struct {
httpClient *cloudapi.Client
baseURL string
url string
}

// newMetricsClient creates and initializes a new MetricsClient.
func newMetricsClient(c *cloudapi.Client) (*metricsClient, error) {
// Unfortunately, the cloudapi.Client works across different versions
// of the API, but it has the v1 harcoded so we need to trim the wrong path
func newMetricsClient(c *cloudapi.Client, testRunID string) (*metricsClient, error) {
// The cloudapi.Client works across different versions of the API, the test
// lifecycle management is under /v1 instead the metrics ingestion is /v2.
// Unfortunately, the current client has v1 hard-coded so we need to trim the wrong path
// to be able to replace it with the correct one.
// A versioned client would be better but it would require a breaking change
// and considering that other services (e.g. k6-operator) depend on it,
// we want to stabilize the API before.
u := c.BaseURL()
if !strings.HasSuffix(u, "/v1") {
return nil, errors.New("a /v1 suffix is expected in the Cloud service's BaseURL path")
}
if testRunID == "" {
return nil, errors.New("TestRunID of the test is required")
}
return &metricsClient{
httpClient: c,
baseURL: strings.TrimSuffix(u, "/v1") + "/v2/metrics/",
url: strings.TrimSuffix(u, "/v1") + "/v2/metrics/" + testRunID,
}, nil
}

// Push the provided metrics for the given test run ID.
func (mc *metricsClient) push(referenceID string, samples *pbcloud.MetricSet) error {
if referenceID == "" {
return errors.New("TestRunID of the test is required")
}

func (mc *metricsClient) push(samples *pbcloud.MetricSet) error {
b, err := newRequestBody(samples)
if err != nil {
return err
}

// TODO: it is always the same
// we don't expect to share this client across different refID
// with a bit of effort we can find a way to just allocate once
url := mc.baseURL + referenceID
req, err := http.NewRequestWithContext(
context.Background(), http.MethodPost, url, io.NopCloser(bytes.NewReader(b)))
context.Background(), http.MethodPost, mc.url, io.NopCloser(bytes.NewReader(b)))
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions output/cloud/expv2/metrics_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ func TestMetricsClientPush(t *testing.T) {
defer ts.Close()

c := cloudapi.NewClient(nil, "fake-token", ts.URL, "k6cloud/v0.4", 1*time.Second)
mc, err := newMetricsClient(c)
mc, err := newMetricsClient(c, "test-ref-id")
require.NoError(t, err)

mset := pbcloud.MetricSet{}
err = mc.push("test-ref-id", &mset)
err = mc.push(&mset)
require.NoError(t, err)
assert.Equal(t, 1, reqs)
}
Expand All @@ -55,9 +55,9 @@ func TestMetricsClientPushUnexpectedStatus(t *testing.T) {
defer ts.Close()

c := cloudapi.NewClient(nil, "fake-token", ts.URL, "k6cloud/v0.4", 1*time.Second)
mc, err := newMetricsClient(c)
mc, err := newMetricsClient(c, "test-ref-id")
require.NoError(t, err)

err = mc.push("test-ref-id", nil)
err = mc.push(nil)
assert.ErrorContains(t, err, "500 Internal Server Error")
}
2 changes: 1 addition & 1 deletion output/cloud/expv2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (o *Output) Start() error {
return fmt.Errorf("failed to initialize the samples collector: %w", err)
}

mc, err := newMetricsClient(o.cloudClient)
mc, err := newMetricsClient(o.cloudClient, o.referenceID)
if err != nil {
return fmt.Errorf("failed to initialize the http metrics flush client: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions output/cloud/expv2/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func TestOutputCollectSamples(t *testing.T) {
logger, conf.Token.String, conf.Host.String, "v/test", conf.Timeout.TimeDuration())
o, err := New(logger, conf, cc)
require.NoError(t, err)

o.SetReferenceID("ref-id-123")
require.NoError(t, o.Start())
require.Empty(t, o.collector.bq.PopAll())

Expand Down Expand Up @@ -286,6 +288,7 @@ func TestOutputStopWithTestError(t *testing.T) {
o, err := New(logger, config, cc)
require.NoError(t, err)

o.SetReferenceID("ref-id-123")
require.NoError(t, o.Start())
require.NoError(t, o.StopWithTestError(errors.New("an error")))
}
Expand Down