Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

Commit

Permalink
Add a metric batcher which preallocates batch sizes. (#194)
Browse files Browse the repository at this point in the history
* Add a metric batcher which preallocates batch sizes.

* Fix received/dropped logic.

* Fix comments and add a new method that returns num dropped timeseries

* Use recordDroppedTimeseries when possible

* Rename ExportMetricsProtoAndReturnDropped to PushMetricsProto

* Fix tests.
  • Loading branch information
bogdandrutu committed Sep 1, 2019
1 parent 3a3e471 commit 3b40441
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 172 deletions.
12 changes: 8 additions & 4 deletions equivalence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,22 @@ func TestStatsAndMetricsEquivalence(t *testing.T) {
Name: fmt.Sprintf("projects/%s", se.o.ProjectID),
MetricDescriptor: sMD,
}
pMDR, err := se.protoMetricDescriptorToCreateMetricDescriptorRequest(ctx, metricPbs[i], nil)
inMD, err := se.protoToMonitoringMetricDescriptor(metricPbs[i], nil)
if err != nil {
t.Errorf("#%d: Stats.protoMetricDescriptorToMetricDescriptor: %v", i, err)
}
pMDR := &monitoringpb.CreateMetricDescriptorRequest{
Name: fmt.Sprintf("projects/%s", se.o.ProjectID),
MetricDescriptor: inMD,
}
if diff := cmpMDReq(pMDR, sMDR); diff != "" {
t.Fatalf("MetricDescriptor Mismatch -FromMetricsPb +FromMetrics: %s", diff)
}

stss, _ := se.metricToMpbTs(ctx, metric)
sctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(stss)
tsl, _ := se.protoMetricToTimeSeries(ctx, se.getResource(nil, metricPbs[i], seenResources), metricPbs[i], nil)
pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(tsl)
allTss, _ := protoMetricToTimeSeries(ctx, se, se.getResource(nil, metricPbs[i], seenResources), metricPbs[i])
pctreql := se.combineTimeSeriesToCreateTimeSeriesRequest(allTss)
if diff := cmpTSReqs(pctreql, sctreql); diff != "" {
t.Fatalf("TimeSeries Mismatch -FromMetricsPb +FromMetrics: %s", diff)
}
Expand Down Expand Up @@ -336,7 +340,7 @@ func TestEquivalenceStatsVsMetricsUploads(t *testing.T) {
})

// Export the proto Metrics to the Stackdriver backend.
se.ExportMetricsProto(context.Background(), nil, nil, metricPbs)
se.PushMetricsProto(context.Background(), nil, nil, metricPbs)
se.Flush()

var stackdriverTimeSeriesFromMetricsPb []*monitoringpb.CreateTimeSeriesRequest
Expand Down
91 changes: 91 additions & 0 deletions metrics_batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stackdriver

import (
"context"
"fmt"
"strings"

monitoring "cloud.google.com/go/monitoring/apiv3"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)

type metricsBatcher struct {
projectID string
allReqs []*monitoringpb.CreateTimeSeriesRequest
allTss []*monitoringpb.TimeSeries
allErrs []error
// Counts all dropped TimeSeries by this exporter.
droppedTimeSeries int
}

func newMetricsBatcher(projectID string) *metricsBatcher {
return &metricsBatcher{
projectID: projectID,
allTss: make([]*monitoringpb.TimeSeries, 0, maxTimeSeriesPerUpload),
droppedTimeSeries: 0,
}
}

func (mb *metricsBatcher) recordDroppedTimeseries(numTimeSeries int, err error) {
mb.droppedTimeSeries += numTimeSeries
mb.allErrs = append(mb.allErrs, err)
}

func (mb *metricsBatcher) addTimeSeries(ts *monitoringpb.TimeSeries) {
mb.allTss = append(mb.allTss, ts)
if len(mb.allTss) == maxTimeSeriesPerUpload {
mb.allReqs = append(mb.allReqs, &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(mb.projectID),
TimeSeries: mb.allTss,
})
mb.allTss = make([]*monitoringpb.TimeSeries, maxTimeSeriesPerUpload)
}
}

func (mb *metricsBatcher) export(ctx context.Context, mc *monitoring.MetricClient) {
// Last batch, if any.
if len(mb.allTss) > 0 {
mb.allReqs = append(mb.allReqs, &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(mb.projectID),
TimeSeries: mb.allTss,
})
}

// Send create time series requests to Stackdriver.
for _, req := range mb.allReqs {
if err := createTimeSeries(ctx, mc, req); err != nil {
mb.recordDroppedTimeseries(len(req.TimeSeries), err)
}
}
}

func (mb *metricsBatcher) finalError() error {
numErrors := len(mb.allErrs)
if numErrors == 0 {
return nil
}

if numErrors == 1 {
return mb.allErrs[0]
}

errMsgs := make([]string, 0, numErrors)
for _, err := range mb.allErrs {
errMsgs = append(errMsgs, err.Error())
}
return fmt.Errorf("[%s]", strings.Join(errMsgs, "; "))
}

0 comments on commit 3b40441

Please sign in to comment.