Skip to content

Commit

Permalink
Return error if there are CheckErrors or ReportErrors
Browse files Browse the repository at this point in the history
  • Loading branch information
gibbleyg committed Apr 29, 2024
1 parent 61e9026 commit b7f82bc
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 18 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ test: .GOPATH/.ok
# $Q go test $(if $V,-v) -i -race $(allpackages) # install -race libs to speed up next run
ifndef CI
$Q go vet $(allpackages)
$Q GODEBUG=cgocheck=2 go test -race $(allpackages)
$Q GOEXPERIMENT=cgocheck2 go test -race $(allpackages)
else
$Q ( go vet $(allpackages); echo $$? ) | \
tee .GOPATH/test/vet.txt | sed '$$ d'; exit $$(tail -1 .GOPATH/test/vet.txt)
$Q ( GODEBUG=cgocheck=2 go test -v -race $(allpackages); echo $$? ) | \
$Q ( GOEXPERIMENT=cgocheck2 go test -v -race $(allpackages); echo $$? ) | \
tee .GOPATH/test/output.txt | sed '$$ d'; exit $$(tail -1 .GOPATH/test/output.txt)
endif

Expand Down
61 changes: 55 additions & 6 deletions pipeline/endpoints/servicecontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ package endpoints

import (
"context"
"errors"
"fmt"
"net"
"time"

"github.com/GoogleCloudPlatform/ubbagent/metrics"

"github.com/GoogleCloudPlatform/ubbagent/clock"
"github.com/GoogleCloudPlatform/ubbagent/metrics"
"github.com/GoogleCloudPlatform/ubbagent/pipeline"
"github.com/GoogleCloudPlatform/ubbagent/util"
"github.com/golang/glog"
"golang.org/x/oauth2/google"
"google.golang.org/api/googleapi"
"google.golang.org/api/servicecontrol/v1"
"github.com/GoogleCloudPlatform/ubbagent/util"
)

const (
Expand All @@ -49,6 +49,11 @@ type ServiceControlEndpoint struct {
clock clock.Clock
}

type checkError struct {
err error
transient bool
}

// NewServiceControlEndpoint creates a new ServiceControlEndpoint.
func NewServiceControlEndpoint(name, serviceName, agentId string, consumerId string, jsonKey []byte) (*ServiceControlEndpoint, error) {
config, err := google.JWTConfigFromJSON(jsonKey, servicecontrol.ServicecontrolScope)
Expand Down Expand Up @@ -98,19 +103,34 @@ func (ep *ServiceControlEndpoint) Send(report pipeline.EndpointReport) error {
checkReq := &servicecontrol.CheckRequest{
Operation: &opNoLabels,
}
_, err := ep.service.Services.Check(ep.serviceName, checkReq).Do()
checkResp, err := ep.service.Services.Check(ep.serviceName, checkReq).Do()
if err != nil && !googleapi.IsNotModified(err) {
return err
}

if len(checkResp.CheckErrors) > 0 {
return checkErrorsToError(checkResp.CheckErrors)
}

ep.nextCheck = ep.clock.Now().Add(checkCacheTimeout)
}

_, err := ep.service.Services.Report(ep.serviceName, req).Do()
resp, err := ep.service.Services.Report(ep.serviceName, req).Do()
if err != nil && !googleapi.IsNotModified(err) {
return err
}

// This will retry reporting all operations.
// However, identical operations are de-duped for billing
if len(resp.ReportErrors) > 0 {
var errs []error
for _, reportErr := range resp.ReportErrors {
errs = append(errs, reportErrorToError(reportErr))
}
return errors.Join(errs...)
}

glog.V(2).Infoln("ServiceControlEndpoint:Send(): success")
// TODO(volkman): Handle potential per-operation errors in response body
return nil
}

Expand Down Expand Up @@ -175,9 +195,38 @@ func (ep *ServiceControlEndpoint) IsTransient(err error) bool {
case net.Error:
// Return true if this error is considered temporary or a timeout.
return v.Temporary() || v.Timeout()
case *checkError:
return v.transient
default:
// Some non-http error (perhaps a connection refused or timeout?)
// We'll retry.
return true
}
}

func checkErrorsToError(checkErrors []*servicecontrol.CheckError) error {
var errs []error
var transient = true
for _, checkError := range checkErrors {
fmt.Println("Check error", checkError.Code)
switch checkError.Code {
// These errors indicate customer disabling billing and
// is not retriable. See: https://cloud.google.com/marketplace/docs/partners/integrated-saas/backend-integration#for_usage-based_pricing_reporting_usage_to_google
case "BILLING_DISABLED", "SERVICE_NOT_ACTIVATED", "PROJECT_DELETED":
transient = false
fmt.Println("Transient")
}
bytes, _ := checkError.MarshalJSON()
errs = append(errs, errors.New(string(bytes)))
}
return &checkError{err: errors.Join(errs...), transient: transient}
}

func (ce checkError) Error() string {
return ce.err.Error()
}

func reportErrorToError(reportError *servicecontrol.ReportError) error {
bytes, _ := reportError.MarshalJSON()
return errors.New(string(bytes))
}
157 changes: 147 additions & 10 deletions pipeline/endpoints/servicecontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,69 @@ func (e mockNetError) Timeout() bool {

func (h *recordingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.req = r

var err error
h.body, err = ioutil.ReadAll(r.Body)
if err != nil {
panic(err)
}

var respJson []byte
if strings.Contains(r.RequestURI, ":check") {
h.checkCount++
req := &servicecontrol.CheckRequest{}
err := json.Unmarshal(h.body, req)
if err != nil {
h.t.Fatalf("Unable to parse check request %+v", err)
}

resp := &servicecontrol.CheckResponse{}

if req.Operation.OperationId == "billing-disabled" {
resp.CheckErrors = []*servicecontrol.CheckError{{
Code: "BILLING_DISABLED",
}}
}

if req.Operation.OperationId == "check-unknown-error" {
resp.CheckErrors = []*servicecontrol.CheckError{{
Code: "UNKNOWN",
}}
}

respJson, err = resp.MarshalJSON()
if err != nil {
panic(err)
}
}

if strings.Contains(r.RequestURI, ":report") {
h.reportCount++
if h.checkCount == 0 {
h.t.Fatalf("Check should be called before Report")
}
}

var err error
h.body, err = ioutil.ReadAll(r.Body)
if err != nil {
panic(err)
}
resp := &servicecontrol.ReportResponse{}
respJson, err := resp.MarshalJSON()
if err != nil {
panic(err)
req := &servicecontrol.ReportRequest{}
err := json.Unmarshal(h.body, req)
if err != nil {
h.t.Fatalf("Unable to parse report request %+v", err)
}

resp := &servicecontrol.ReportResponse{}
if req.Operations[0].OperationId == "report-error" {
resp.ReportErrors = []*servicecontrol.ReportError{{
Status: &servicecontrol.Status{
Message: "Unknown report error",
},
}}
}

respJson, err = resp.MarshalJSON()
if err != nil {
panic(err)
}
}

w.Write(respJson)
}

Expand Down Expand Up @@ -194,6 +236,68 @@ func TestServiceControlEndpoint(t *testing.T) {
}
})

t.Run("Check error BILLING_DISABLED returns non-retriable error", func(t *testing.T) {
ep.nextCheck = time.Now().Add(time.Minute * -1)

// Test a single report write
report, err := ep.BuildReport(metrics.StampedMetricReport{
Id: "billing-disabled",
MetricReport: metrics.MetricReport{
Name: "int-metric1",
StartTime: time.Unix(0, 0),
EndTime: time.Unix(1, 0),
Value: metrics.MetricValue{
Int64Value: util.NewInt64(10),
},
},
})
if err != nil {
t.Fatalf("error building report: %+v", err)
}

err = ep.Send(report)
if err == nil {
t.Fatalf("expected error sending report")
}

checkErr := err.(*checkError)
if checkErr.transient {
t.Fatalf("expected billing disabled to not be a transient error")
}

})

t.Run("Unknown check error returns retriable error", func(t *testing.T) {
ep.nextCheck = time.Now().Add(time.Minute * -1)

// Test a single report write
report, err := ep.BuildReport(metrics.StampedMetricReport{
Id: "check-unknown-error",
MetricReport: metrics.MetricReport{
Name: "int-metric1",
StartTime: time.Unix(0, 0),
EndTime: time.Unix(1, 0),
Value: metrics.MetricValue{
Int64Value: util.NewInt64(10),
},
},
})
if err != nil {
t.Fatalf("error building report: %+v", err)
}

err = ep.Send(report)
if err == nil {
t.Fatalf("expected error sending report")
}

checkErr := err.(*checkError)
if !checkErr.transient {
t.Fatalf("expected transient error")
}

})

t.Run("Sent contents are correct", func(t *testing.T) {
// Test a single report write
report1, err := ep.BuildReport(metrics.StampedMetricReport{
Expand Down Expand Up @@ -268,6 +372,37 @@ func TestServiceControlEndpoint(t *testing.T) {
}
})

t.Run("ReportError returns transient error", func(t *testing.T) {
// Test a single report write
report, err := ep.BuildReport(metrics.StampedMetricReport{
Id: "report-error",
MetricReport: metrics.MetricReport{
Name: "int-metric1",
StartTime: time.Unix(0, 0),
EndTime: time.Unix(1, 0),
Value: metrics.MetricValue{
Int64Value: util.NewInt64(10),
},
},
})
if err != nil {
t.Fatalf("error building report: %+v", err)
}

err = ep.Send(report)
if err == nil {
t.Fatalf("expected error sending report")
}

if !ep.IsTransient(err) {
t.Fatalf("expected transient error")
}

if !strings.Contains(err.Error(), "Unknown report error") {
t.Fatalf("expected unknown report error")
}
})

t.Run("IsTransient tests", func(t *testing.T) {
cases := []struct {
err error
Expand All @@ -285,6 +420,8 @@ func TestServiceControlEndpoint(t *testing.T) {
{mockNetError{temporary: true, timeout: false}, true},
{mockNetError{temporary: false, timeout: true}, true},
{mockNetError{temporary: true, timeout: true}, true},
{&checkError{err: errors.New("foo"), transient: true}, true},
{&checkError{err: errors.New("foo"), transient: false}, false},
}
for _, c := range cases {
if want, got := c.transient, ep.IsTransient(c.err); want != got {
Expand Down

0 comments on commit b7f82bc

Please sign in to comment.