From 03d5cedd55294185a20f6e5c175a908ba0a395e3 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Fri, 28 Feb 2020 11:28:00 +0100 Subject: [PATCH 1/2] Boskos ranch: Retry on conflict --- boskos/ranch/BUILD.bazel | 9 +- boskos/ranch/ranch.go | 368 +++++++++++++++++++++---------------- boskos/ranch/ranch_test.go | 106 +++++++++-- boskos/ranch/storage.go | 227 +++++++++++------------ 4 files changed, 426 insertions(+), 284 deletions(-) diff --git a/boskos/ranch/BUILD.bazel b/boskos/ranch/BUILD.bazel index 626f8cdd5c02..6d5e92c24d04 100644 --- a/boskos/ranch/BUILD.bazel +++ b/boskos/ranch/BUILD.bazel @@ -19,8 +19,12 @@ go_test( "@com_github_go_test_deep//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@io_k8s_apimachinery//pkg/api/equality:go_default_library", + "@io_k8s_apimachinery//pkg/api/errors:go_default_library", "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library", "@io_k8s_apimachinery//pkg/runtime:go_default_library", + "@io_k8s_apimachinery//pkg/runtime/schema:go_default_library", + "@io_k8s_apimachinery//pkg/util/errors:go_default_library", + "@io_k8s_sigs_controller_runtime//pkg/client:go_default_library", "@io_k8s_sigs_controller_runtime//pkg/client/fake:go_default_library", ], ) @@ -36,11 +40,14 @@ go_library( deps = [ "//boskos/common:go_default_library", "//boskos/crds:go_default_library", - "@com_github_hashicorp_go_multierror//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", + "@io_k8s_apimachinery//pkg/api/errors:go_default_library", "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library", "@io_k8s_apimachinery//pkg/types:go_default_library", + "@io_k8s_apimachinery//pkg/util/errors:go_default_library", "@io_k8s_apimachinery//pkg/util/sets:go_default_library", + "@io_k8s_apimachinery//pkg/util/wait:go_default_library", + "@io_k8s_client_go//util/retry:go_default_library", "@io_k8s_sigs_controller_runtime//pkg/client:go_default_library", ], ) diff --git a/boskos/ranch/ranch.go b/boskos/ranch/ranch.go index ae0fd50edd68..e739f163aa95 100644 --- a/boskos/ranch/ranch.go +++ b/boskos/ranch/ranch.go @@ -24,7 +24,11 @@ import ( "time" "github.com/sirupsen/logrus" + kerrors "k8s.io/apimachinery/pkg/api/errors" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" "k8s.io/test-infra/boskos/common" "k8s.io/test-infra/boskos/crds" @@ -124,76 +128,80 @@ func (r *Ranch) Acquire(rType, state, dest, owner, requestID string) (*crds.Reso r.resourcesLock.Lock() defer r.resourcesLock.Unlock() - logger.Debug("Determining request priority...") - ts := acquireRequestPriorityKey{rType: rType, state: state} - rank, new := r.requestMgr.GetRank(ts, requestID) - logger.WithFields(logrus.Fields{"rank": rank, "new": new}).Debug("Determined request priority.") + var returnRes *crds.ResourceObject + return returnRes, retryOnConflict(retry.DefaultBackoff, func() error { + logger.Debug("Determining request priority...") + ts := acquireRequestPriorityKey{rType: rType, state: state} + rank, new := r.requestMgr.GetRank(ts, requestID) + logger.WithFields(logrus.Fields{"rank": rank, "new": new}).Debug("Determined request priority.") - resources, err := r.Storage.GetResources() - if err != nil { - logger.WithError(err).Errorf("could not get resources") - return nil, &ResourceNotFound{rType} - } - logger.Debugf("Considering %d resources.", len(resources.Items)) - - // For request priority we need to go over all the list until a matching rank - matchingResoucesCount := 0 - typeCount := 0 - for idx := range resources.Items { - res := resources.Items[idx] - if rType != res.Spec.Type { - continue + resources, err := r.Storage.GetResources() + if err != nil { + logger.WithError(err).Errorf("could not get resources") + return &ResourceNotFound{rType} } - typeCount++ + logger.Debugf("Considering %d resources.", len(resources.Items)) + + // For request priority we need to go over all the list until a matching rank + matchingResoucesCount := 0 + typeCount := 0 + for idx := range resources.Items { + res := resources.Items[idx] + if rType != res.Spec.Type { + continue + } + typeCount++ - if state != res.Status.State || res.Status.Owner != "" { - continue - } - matchingResoucesCount++ + if state != res.Status.State || res.Status.Owner != "" { + continue + } + matchingResoucesCount++ - if matchingResoucesCount < rank { - continue - } - logger = logger.WithField("resource", res.Name) - res.Status.Owner = owner - res.Status.State = dest - logger.Debug("Updating resource.") - updatedRes, err := r.Storage.UpdateResource(&res) - if err != nil { - logger.WithError(err).Errorf("could not update resource %s", res.Name) - return nil, err - } - // Deleting this request since it has been fulfilled - if requestID != "" { - logger.Debug("Cleaning up requests.") - r.requestMgr.Delete(ts, requestID) + if matchingResoucesCount < rank { + continue + } + logger = logger.WithField("resource", res.Name) + res.Status.Owner = owner + res.Status.State = dest + logger.Debug("Updating resource.") + updatedRes, err := r.Storage.UpdateResource(&res) + if err != nil { + logger.WithError(err).Errorf("could not update resource %s", res.Name) + return err + } + // Deleting this request since it has been fulfilled + if requestID != "" { + logger.Debug("Cleaning up requests.") + r.requestMgr.Delete(ts, requestID) + } + logger.Debug("Successfully acquired resource.") + returnRes = updatedRes + return nil } - logger.Debug("Successfully acquired resource.") - return updatedRes, nil - } - if new { - logger.Debug("Checking for associated dynamic resource type...") - lifeCycle, err := r.Storage.GetDynamicResourceLifeCycle(rType) - // Assuming error means no associated dynamic resource. - if err == nil { - if typeCount < lifeCycle.Spec.MaxCount { - logger.Debug("Adding new dynamic resources...") - res := newResourceFromNewDynamicResourceLifeCycle(r.Storage.generateName(), lifeCycle, r.now()) - if err := r.Storage.AddResource(res); err != nil { - logger.WithError(err).Warningf("unable to add a new resource of type %s", rType) + if new { + logger.Debug("Checking for associated dynamic resource type...") + lifeCycle, err := r.Storage.GetDynamicResourceLifeCycle(rType) + // Assuming error means no associated dynamic resource. + if err == nil { + if typeCount < lifeCycle.Spec.MaxCount { + logger.Debug("Adding new dynamic resources...") + res := newResourceFromNewDynamicResourceLifeCycle(r.Storage.generateName(), lifeCycle, r.now()) + if err := r.Storage.AddResource(res); err != nil { + logger.WithError(err).Warningf("unable to add a new resource of type %s", rType) + } + logger.Infof("Added dynamic resource %s of type %s", res.Name, res.Spec.Type) } - logger.Infof("Added dynamic resource %s of type %s", res.Name, res.Spec.Type) + } else { + logrus.WithError(err).Debug("Failed listing DRLC") } - } else { - logrus.WithError(err).Debug("Failed listing DRLC") } - } - if typeCount > 0 { - return nil, &ResourceNotFound{rType} - } - return nil, &ResourceTypeNotFound{rType} + if typeCount > 0 { + return &ResourceNotFound{rType} + } + return &ResourceTypeNotFound{rType} + }) } // AcquireByState checks out resources of a given type without an owner, @@ -212,40 +220,45 @@ func (r *Ranch) AcquireByState(state, dest, owner string, names []string) ([]*cr return nil, fmt.Errorf("must provide names of expected resources") } - rNames := sets.NewString(names...) + var returnRes []*crds.ResourceObject + return returnRes, retryOnConflict(retry.DefaultBackoff, func() error { + rNames := sets.NewString(names...) - allResources, err := r.Storage.GetResources() - if err != nil { - logrus.WithError(err).Errorf("could not get resources") - return nil, &ResourceNotFound{state} - } + allResources, err := r.Storage.GetResources() + if err != nil { + logrus.WithError(err).Errorf("could not get resources") + return &ResourceNotFound{state} + } - var resources []*crds.ResourceObject + var resources []*crds.ResourceObject - for idx := range allResources.Items { - res := allResources.Items[idx] - if state != res.Status.State || res.Status.Owner != "" || !rNames.Has(res.Name) { - continue - } + for idx := range allResources.Items { + res := allResources.Items[idx] + if state != res.Status.State || res.Status.Owner != "" || !rNames.Has(res.Name) { + continue + } - res.Status.Owner = owner - res.Status.State = dest - updatedRes, err := r.Storage.UpdateResource(&res) - if err != nil { - logrus.WithError(err).Errorf("could not update resource %s", res.Name) - return nil, err + res.Status.Owner = owner + res.Status.State = dest + updatedRes, err := r.Storage.UpdateResource(&res) + if err != nil { + logrus.WithError(err).Errorf("could not update resource %s", res.Name) + return err + } + resources = append(resources, updatedRes) + rNames.Delete(res.Name) } - resources = append(resources, updatedRes) - rNames.Delete(res.Name) - } - if rNames.Len() != 0 { - missingResources := rNames.List() - err := &ResourceNotFound{state} - logrus.WithError(err).Errorf("could not find required resources %s", strings.Join(missingResources, ", ")) - return resources, err - } - return resources, nil + if rNames.Len() != 0 { + missingResources := rNames.List() + err := &ResourceNotFound{state} + logrus.WithError(err).Errorf("could not find required resources %s", strings.Join(missingResources, ", ")) + returnRes = resources + return err + } + returnRes = resources + return nil + }) } // Release unsets owner for target resource and move it to a new state. @@ -259,34 +272,36 @@ func (r *Ranch) Release(name, dest, owner string) error { r.resourcesLock.Lock() defer r.resourcesLock.Unlock() - res, err := r.Storage.GetResource(name) - if err != nil { - logrus.WithError(err).Errorf("unable to release resource %s", name) - return &ResourceNotFound{name} - } - if owner != res.Status.Owner { - return &OwnerNotMatch{request: owner, owner: res.Status.Owner} - } + return retryOnConflict(retry.DefaultBackoff, func() error { + res, err := r.Storage.GetResource(name) + if err != nil { + logrus.WithError(err).Errorf("unable to release resource %s", name) + return &ResourceNotFound{name} + } + if owner != res.Status.Owner { + return &OwnerNotMatch{request: owner, owner: res.Status.Owner} + } - res.Status.Owner = "" - res.Status.State = dest + res.Status.Owner = "" + res.Status.State = dest - if lf, err := r.Storage.GetDynamicResourceLifeCycle(res.Spec.Type); err == nil { - // Assuming error means not existing as the only way to differentiate would be to list - // all resources and find the right one which is more costly. - if lf.Spec.LifeSpan != nil { - expirationTime := r.now().Add(*lf.Spec.LifeSpan) - res.Status.ExpirationDate = &expirationTime + if lf, err := r.Storage.GetDynamicResourceLifeCycle(res.Spec.Type); err == nil { + // Assuming error means not existing as the only way to differentiate would be to list + // all resources and find the right one which is more costly. + if lf.Spec.LifeSpan != nil { + expirationTime := r.now().Add(*lf.Spec.LifeSpan) + res.Status.ExpirationDate = &expirationTime + } + } else { + res.Status.ExpirationDate = nil } - } else { - res.Status.ExpirationDate = nil - } - if _, err := r.Storage.UpdateResource(res); err != nil { - logrus.WithError(err).Errorf("could not update resource %s", res.Name) - return err - } - return nil + if _, err := r.Storage.UpdateResource(res); err != nil { + logrus.WithError(err).Errorf("could not update resource %s", res.Name) + return err + } + return nil + }) } // Update updates the timestamp of a target resource. @@ -302,26 +317,28 @@ func (r *Ranch) Update(name, owner, state string, ud *common.UserData) error { r.resourcesLock.Lock() defer r.resourcesLock.Unlock() - res, err := r.Storage.GetResource(name) - if err != nil { - logrus.WithError(err).Errorf("could not find resource %s for update", name) - return &ResourceNotFound{name} - } - if owner != res.Status.Owner { - return &OwnerNotMatch{request: owner, owner: res.Status.Owner} - } - if state != res.Status.State { - return &StateNotMatch{res.Status.State, state} - } - if res.Status.UserData == nil { - res.Status.UserData = &common.UserData{} - } - res.Status.UserData.Update(ud) - if _, err := r.Storage.UpdateResource(res); err != nil { - logrus.WithError(err).Errorf("could not update resource %s", res.Name) - return err - } - return nil + return retryOnConflict(retry.DefaultBackoff, func() error { + res, err := r.Storage.GetResource(name) + if err != nil { + logrus.WithError(err).Errorf("could not find resource %s for update", name) + return &ResourceNotFound{name} + } + if owner != res.Status.Owner { + return &OwnerNotMatch{request: owner, owner: res.Status.Owner} + } + if state != res.Status.State { + return &StateNotMatch{res.Status.State, state} + } + if res.Status.UserData == nil { + res.Status.UserData = &common.UserData{} + } + res.Status.UserData.Update(ud) + if _, err := r.Storage.UpdateResource(res); err != nil { + logrus.WithError(err).Errorf("could not update resource %s", res.Name) + return err + } + return nil + }) } // Reset unstucks a type of stale resource to a new state. @@ -334,29 +351,31 @@ func (r *Ranch) Reset(rtype, state string, expire time.Duration, dest string) (m r.resourcesLock.Lock() defer r.resourcesLock.Unlock() - ret := make(map[string]string) - - resources, err := r.Storage.GetResources() - if err != nil { - logrus.WithError(err).Errorf("cannot find resources") - return nil, err - } - - for idx := range resources.Items { - res := resources.Items[idx] - if rtype != res.Spec.Type || state != res.Status.State || res.Status.Owner == "" || r.now().Sub(res.Status.LastUpdate) < expire { - continue + var ret map[string]string + return ret, retryOnConflict(retry.DefaultBackoff, func() error { + ret = make(map[string]string) + resources, err := r.Storage.GetResources() + if err != nil { + logrus.WithError(err).Errorf("cannot find resources") + return err } - ret[res.Name] = res.Status.Owner - res.Status.Owner = "" - res.Status.State = dest - if _, err := r.Storage.UpdateResource(&res); err != nil { - logrus.WithError(err).Errorf("could not update resource %s", res.Name) - return ret, err + for idx := range resources.Items { + res := resources.Items[idx] + if rtype != res.Spec.Type || state != res.Status.State || res.Status.Owner == "" || r.now().Sub(res.Status.LastUpdate) < expire { + continue + } + + ret[res.Name] = res.Status.Owner + res.Status.Owner = "" + res.Status.State = dest + if _, err := r.Storage.UpdateResource(&res); err != nil { + logrus.WithError(err).Errorf("could not update resource %s", res.Name) + return err + } } - } - return ret, nil + return nil + }) } // SyncConfig updates resource list from a file @@ -370,10 +389,7 @@ func (r *Ranch) SyncConfig(configPath string) error { } r.resourcesLock.Lock() defer r.resourcesLock.Unlock() - if err := r.Storage.SyncResources(config); err != nil { - return err - } - return nil + return r.Storage.SyncResources(config) } // StartDynamicResourceUpdater starts a goroutine which periodically @@ -463,3 +479,43 @@ func (r *Ranch) AllMetrics() ([]common.Metric, error) { func newResourceFromNewDynamicResourceLifeCycle(name string, dlrc *crds.DRLCObject, now time.Time) *crds.ResourceObject { return crds.NewResource(name, dlrc.Name, dlrc.Spec.InitialState, "", now) } + +// retryOnConflict is a copy of https://godoc.org/k8s.io/client-go/util/retry#RetryOnConflict +// that only differs in the isConflict check, we use a variant that supports wrapped errors. +// TODO: Simplify this by using retry.OnError once we have a sufficiently new client-go dependency +func retryOnConflict(backoff wait.Backoff, fn func() error) error { + var lastConflictErr error + err := wait.ExponentialBackoff(backoff, func() (bool, error) { + err := fn() + switch { + case err == nil: + return true, nil + case isConflict(err): + lastConflictErr = err + return false, nil + default: + return false, err + } + }) + if err == wait.ErrWaitTimeout { + err = lastConflictErr + } + return err +} + +func isConflict(err error) bool { + if kerrors.IsConflict(err) { + return true + } + if x, ok := err.(interface{ Unwrap() error }); ok { + return isConflict(x.Unwrap()) + } + if aggregate, ok := err.(utilerrors.Aggregate); ok { + for _, err := range aggregate.Errors() { + if isConflict(err) { + return true + } + } + } + return false +} diff --git a/boskos/ranch/ranch_test.go b/boskos/ranch/ranch_test.go index deb48d3bc947..6e4bb2722004 100644 --- a/boskos/ranch/ranch_test.go +++ b/boskos/ranch/ranch_test.go @@ -25,11 +25,16 @@ import ( "testing" "time" + "errors" "github.com/go-test/deep" "github.com/sirupsen/logrus" apiequality "k8s.io/apimachinery/pkg/api/equality" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" fakectrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" "k8s.io/test-infra/boskos/common" @@ -67,11 +72,12 @@ func fakeTime(t time.Time) time.Time { const testNS = "test" -func MakeTestRanch(objects []runtime.Object) *Ranch { +func makeTestRanch(objects []runtime.Object) *Ranch { for _, obj := range objects { obj.(metav1.Object).SetNamespace(testNS) } - s, _ := NewStorage(context.Background(), fakectrlruntimeclient.NewFakeClient(objects...), testNS, "") + client := &onceConflictingClient{Client: fakectrlruntimeclient.NewFakeClient(objects...)} + s, _ := NewStorage(context.Background(), client, testNS, "") s.now = func() time.Time { return fakeNow } @@ -192,7 +198,7 @@ func TestAcquire(t *testing.T) { } for _, tc := range testcases { - c := MakeTestRanch(tc.resources) + c := makeTestRanch(tc.resources) res, err := c.Acquire(tc.rtype, tc.state, tc.dest, tc.owner, "") if !AreErrorsEqual(err, tc.expectErr) { t.Errorf("%s - Got error %v, expected error %v", tc.name, err, tc.expectErr) @@ -229,7 +235,7 @@ func TestAcquirePriority(t *testing.T) { expiredFuture := now.Add(2 * testTTL) owner := "tester" res := crds.NewResource("res", "type", common.Free, "", now) - r := MakeTestRanch(nil) + r := makeTestRanch(nil) r.requestMgr.now = func() time.Time { return now } // Setting Priority, this request will fail @@ -275,7 +281,7 @@ func TestAcquireRoundRobin(t *testing.T) { results := map[string]int{} - c := MakeTestRanch(resources) + c := makeTestRanch(resources) for i := 0; i < 4; i++ { res, err := c.Acquire("t", "s", "d", "foo", "") if err != nil { @@ -309,7 +315,7 @@ func TestAcquireOnDemand(t *testing.T) { }, } // Not adding any resources to start with - c := MakeTestRanch(dRLCs) + c := makeTestRanch(dRLCs) c.now = func() time.Time { return now } // First acquire should trigger a creation if _, err := c.Acquire(rType, common.Free, common.Busy, owner, requestID1); err == nil { @@ -448,7 +454,7 @@ func TestRelease(t *testing.T) { if tc.expectedRes != nil { tc.expectedRes.Namespace = testNS } - c := MakeTestRanch(objs) + c := makeTestRanch(objs) releaseErr := c.Release(tc.resName, tc.dest, tc.owner) if !AreErrorsEqual(releaseErr, tc.expectErr) { t.Fatalf("Got error %v, expected error %v", releaseErr, tc.expectErr) @@ -526,7 +532,7 @@ func TestReset(t *testing.T) { } for _, tc := range testcases { - c := MakeTestRanch(tc.resources) + c := makeTestRanch(tc.resources) rmap, err := c.Reset(tc.rtype, tc.state, tc.expire, tc.dest) if err != nil { t.Errorf("failed to reset %v", err) @@ -611,7 +617,7 @@ func TestUpdate(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - c := MakeTestRanch(tc.resources) + c := makeTestRanch(tc.resources) err := c.Update(tc.resName, tc.owner, tc.state, nil) if !AreErrorsEqual(err, tc.expectErr) { t.Fatalf("Got error %v, expected error %v", err, tc.expectErr) @@ -704,7 +710,7 @@ func TestMetric(t *testing.T) { } for _, tc := range testcases { - c := MakeTestRanch(tc.resources) + c := makeTestRanch(tc.resources) metric, err := c.Metric(tc.metricType) if !AreErrorsEqual(err, tc.expectErr) { t.Errorf("%s - Got error %v, expected error %v", tc.name, err, tc.expectErr) @@ -795,7 +801,7 @@ func TestAllMetrics(t *testing.T) { } for _, tc := range testcases { - c := MakeTestRanch(tc.resources) + c := makeTestRanch(tc.resources) metrics, err := c.AllMetrics() if err != nil { t.Errorf("%s - Got error %v", tc.name, err) @@ -1356,8 +1362,10 @@ func TestSyncResources(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - c := MakeTestRanch(tc.currentRes) - c.Storage.SyncResources(tc.config) + c := makeTestRanch(tc.currentRes) + if err := c.Storage.SyncResources(tc.config); err != nil { + t.Fatalf("syncResources failed: %v, type: %T", err, err) + } resources, err := c.Storage.GetResources() if err != nil { t.Fatalf("failed to get resources: %v", err) @@ -1670,7 +1678,7 @@ func TestUpdateAllDynamicResources(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - c := MakeTestRanch(tc.currentRes) + c := makeTestRanch(tc.currentRes) err := c.Storage.UpdateAllDynamicResources() if err != nil { t.Fatalf("error updating dynamic resources: %v", err) @@ -1758,3 +1766,73 @@ func sortDRLCList(drlcs ...*crds.DRLCObjectList) { } } } + +// onceConflictingClient returns an IsConflict error on the first Update request it receives. It +// is used to verify that there is retrying for conflicts in place. +type onceConflictingClient struct { + didConflict bool + ctrlruntimeclient.Client +} + +func (occ *onceConflictingClient) Update(ctx context.Context, obj runtime.Object, opts ...ctrlruntimeclient.UpdateOption) error { + if !occ.didConflict { + occ.didConflict = true + return kerrors.NewConflict(schema.GroupResource{}, "obj", errors.New("conflicting as requested")) + } + return occ.Client.Update(ctx, obj, opts...) +} + +func TestIsConflict(t *testing.T) { + gr := schema.GroupResource{} + testCases := []struct { + name string + err error + shouldMatch bool + }{ + { + name: "direct match", + err: kerrors.NewConflict(gr, "test", errors.New("invalid")), + shouldMatch: true, + }, + { + name: "no match", + err: errors.New("something else"), + }, + { + name: "nested match", + err: fmt.Errorf("we found an error: %w", fmt.Errorf("here: %w", kerrors.NewConflict(gr, "test", errors.New("invalid")))), + shouldMatch: true, + }, + { + name: "nested, no match", + err: fmt.Errorf("We also found this: %w", fmt.Errorf("there: %w", errors.New("nope"))), + }, + { + name: "aggregate, match", + err: utilerrors.NewAggregate([]error{errors.New("some err"), kerrors.NewConflict(gr, "test", errors.New("invalid"))}), + shouldMatch: true, + }, + { + name: "aggregate, no match", + err: utilerrors.NewAggregate([]error{errors.New("some err"), errors.New("other err")}), + }, + { + name: "wrapped aggregate, match", + err: fmt.Errorf("err: %w", fmt.Errorf("didn't work: %w", utilerrors.NewAggregate([]error{errors.New("some err"), kerrors.NewConflict(gr, "test", errors.New("invalid"))}))), + shouldMatch: true, + }, + { + name: "wrapped aggregate, no match", + err: fmt.Errorf("err: %w", fmt.Errorf("didn't work: %w", utilerrors.NewAggregate([]error{errors.New("some err"), errors.New("nope")}))), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + if result := isConflict(tc.err); result != tc.shouldMatch { + t.Errorf("expected match: %t, got match: %t", tc.shouldMatch, result) + } + }) + } +} diff --git a/boskos/ranch/storage.go b/boskos/ranch/storage.go index 1807e83d65ce..0af80378a9d9 100644 --- a/boskos/ranch/storage.go +++ b/boskos/ranch/storage.go @@ -27,11 +27,12 @@ import ( "sync" "time" - "github.com/hashicorp/go-multierror" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/retry" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" "k8s.io/test-infra/boskos/common" @@ -118,7 +119,7 @@ func (s *Storage) UpdateResource(resource *crds.ResourceObject) (*crds.ResourceO resource.Status.LastUpdate = s.now() if err := s.client.Update(s.ctx, resource); err != nil { - return nil, fmt.Errorf("failed to update resources %s: %v", resource.Name, err) + return nil, fmt.Errorf("failed to update resources %s: %w", resource.Name, err) } return resource, nil @@ -173,7 +174,7 @@ func (s *Storage) DeleteDynamicResourceLifeCycle(name string) error { func (s *Storage) UpdateDynamicResourceLifeCycle(resource *crds.DRLCObject) (*crds.DRLCObject, error) { resource.Namespace = s.namespace if err := s.client.Update(s.ctx, resource); err != nil { - return nil, fmt.Errorf("failed to update dlrc %s: %v", resource.Name, err) + return nil, fmt.Errorf("failed to update dlrc %s: %w", resource.Name, err) } return resource, nil @@ -210,72 +211,74 @@ func (s *Storage) SyncResources(config *common.BoskosConfig) error { return nil } - newSRByName := map[string]crds.ResourceObject{} - existingSRByName := map[string]crds.ResourceObject{} - newDRLCByType := map[string]crds.DRLCObject{} - existingDRLCByType := map[string]crds.DRLCObject{} + if err := retryOnConflict(retry.DefaultBackoff, func() error { + newSRByName := map[string]crds.ResourceObject{} + existingSRByName := map[string]crds.ResourceObject{} + newDRLCByType := map[string]crds.DRLCObject{} + existingDRLCByType := map[string]crds.DRLCObject{} - for _, entry := range config.Resources { - if entry.IsDRLC() { - newDRLCByType[entry.Type] = *crds.FromDynamicResourceLifecycle(common.NewDynamicResourceLifeCycleFromConfig(entry)) - } else { - for _, res := range common.NewResourcesFromConfig(entry) { - newSRByName[res.Name] = *crds.FromResource(res) + for _, entry := range config.Resources { + if entry.IsDRLC() { + newDRLCByType[entry.Type] = *crds.FromDynamicResourceLifecycle(common.NewDynamicResourceLifeCycleFromConfig(entry)) + } else { + for _, res := range common.NewResourcesFromConfig(entry) { + newSRByName[res.Name] = *crds.FromResource(res) + } } } - } - if err := func() error { - s.resourcesLock.Lock() - defer s.resourcesLock.Unlock() + if err := func() error { + s.resourcesLock.Lock() + defer s.resourcesLock.Unlock() - resources, err := s.GetResources() - if err != nil { - logrus.WithError(err).Error("cannot list resources") - return err - } - existingDRLC, err := s.GetDynamicResourceLifeCycles() - if err != nil { - logrus.WithError(err).Error("cannot list dynamicResourceLifeCycles") - return err - } - for _, dRLC := range existingDRLC.Items { - existingDRLCByType[dRLC.Name] = dRLC - } + resources, err := s.GetResources() + if err != nil { + logrus.WithError(err).Error("cannot list resources") + return err + } + existingDRLC, err := s.GetDynamicResourceLifeCycles() + if err != nil { + logrus.WithError(err).Error("cannot list dynamicResourceLifeCycles") + return err + } + for _, dRLC := range existingDRLC.Items { + existingDRLCByType[dRLC.Name] = dRLC + } - // Split resources between static and dynamic resources - lifeCycleTypes := sets.String{} + // Split resources between static and dynamic resources + lifeCycleTypes := sets.String{} - for _, lc := range existingDRLC.Items { - lifeCycleTypes.Insert(lc.Name) - } - // Considering the migration case from mason resources to dynamic resources. - // Dynamic resources already exist but they don't have an associated DRLC - for _, lc := range newDRLCByType { - lifeCycleTypes.Insert(lc.Name) - } + for _, lc := range existingDRLC.Items { + lifeCycleTypes.Insert(lc.Name) + } + // Considering the migration case from mason resources to dynamic resources. + // Dynamic resources already exist but they don't have an associated DRLC + for _, lc := range newDRLCByType { + lifeCycleTypes.Insert(lc.Name) + } - for _, res := range resources.Items { - if !lifeCycleTypes.Has(res.Spec.Type) { - existingSRByName[res.Name] = res + for _, res := range resources.Items { + if !lifeCycleTypes.Has(res.Spec.Type) { + existingSRByName[res.Name] = res + } } - } - if err := s.syncStaticResources(newSRByName, existingSRByName); err != nil { - return err - } - if err := s.syncDynamicResourceLifeCycles(newDRLCByType, existingDRLCByType); err != nil { + if err := s.syncStaticResources(newSRByName, existingSRByName); err != nil { + return err + } + if err := s.syncDynamicResourceLifeCycles(newDRLCByType, existingDRLCByType); err != nil { + return err + } + return nil + }(); err != nil { return err } return nil - }(); err != nil { + }); err != nil { return err } - if err := s.UpdateAllDynamicResources(); err != nil { - return err - } - return nil + return s.UpdateAllDynamicResources() } // updateDynamicResources updates dynamic resource based on an existing dynamic resource life cycle. @@ -344,63 +347,65 @@ func (s *Storage) updateDynamicResources(lifecycle *crds.DRLCObject, resources [ // any expired resources are deleted, and that any Tombstoned resources are // completely removed. func (s *Storage) UpdateAllDynamicResources() error { - var resToAdd, resToDelete []crds.ResourceObject - var dRLCToDelete []crds.DRLCObject - existingDRLCByType := map[string]crds.DRLCObject{} - existingDRsByType := map[string][]crds.ResourceObject{} - s.resourcesLock.Lock() defer s.resourcesLock.Unlock() - resources, err := s.GetResources() - if err != nil { - logrus.WithError(err).Error("cannot find resources") - return err - } - existingDRLC, err := s.GetDynamicResourceLifeCycles() - if err != nil { - logrus.WithError(err).Error("cannot find DynamicResourceLifeCycles") - return err - } - for _, dRLC := range existingDRLC.Items { - existingDRLCByType[dRLC.Name] = dRLC - } + return retryOnConflict(retry.DefaultBackoff, func() error { + var resToAdd, resToDelete []crds.ResourceObject + var dRLCToDelete []crds.DRLCObject + existingDRLCByType := map[string]crds.DRLCObject{} + existingDRsByType := map[string][]crds.ResourceObject{} - // Filter to only look at dynamic resources - for _, res := range resources.Items { - if _, ok := existingDRLCByType[res.Spec.Type]; ok { - existingDRsByType[res.Spec.Type] = append(existingDRsByType[res.Spec.Type], res) + resources, err := s.GetResources() + if err != nil { + logrus.WithError(err).Error("cannot find resources") + return err + } + existingDRLC, err := s.GetDynamicResourceLifeCycles() + if err != nil { + logrus.WithError(err).Error("cannot find DynamicResourceLifeCycles") + return err + } + for _, dRLC := range existingDRLC.Items { + existingDRLCByType[dRLC.Name] = dRLC } - } - for resType, dRLC := range existingDRLCByType { - existingDRs := existingDRsByType[resType] - toAdd, toDelete := s.updateDynamicResources(&dRLC, existingDRs) - resToAdd = append(resToAdd, toAdd...) - resToDelete = append(resToDelete, toDelete...) - - if dRLC.Spec.MinCount == 0 && dRLC.Spec.MaxCount == 0 { - currentCount := len(existingDRs) - addCount := len(resToAdd) - delCount := len(resToDelete) - if addCount == 0 && (currentCount == 0 || currentCount == delCount) { - dRLCToDelete = append(dRLCToDelete, dRLC) + // Filter to only look at dynamic resources + for _, res := range resources.Items { + if _, ok := existingDRLCByType[res.Spec.Type]; ok { + existingDRsByType[res.Spec.Type] = append(existingDRsByType[res.Spec.Type], res) } } - } - if err := s.persistResources(resToAdd, resToDelete, true); err != nil { - logrus.WithError(err).Error("failed to persist resources") - return err - } + for resType, dRLC := range existingDRLCByType { + existingDRs := existingDRsByType[resType] + toAdd, toDelete := s.updateDynamicResources(&dRLC, existingDRs) + resToAdd = append(resToAdd, toAdd...) + resToDelete = append(resToDelete, toDelete...) + + if dRLC.Spec.MinCount == 0 && dRLC.Spec.MaxCount == 0 { + currentCount := len(existingDRs) + addCount := len(resToAdd) + delCount := len(resToDelete) + if addCount == 0 && (currentCount == 0 || currentCount == delCount) { + dRLCToDelete = append(dRLCToDelete, dRLC) + } + } + } - if len(dRLCToDelete) > 0 { - if err := s.persistDynamicResourceLifeCycles(nil, nil, dRLCToDelete); err != nil { + if err := s.persistResources(resToAdd, resToDelete, true); err != nil { + logrus.WithError(err).Error("failed to persist resources") return err } - } - return nil + if len(dRLCToDelete) > 0 { + if err := s.persistDynamicResourceLifeCycles(nil, nil, dRLCToDelete); err != nil { + return err + } + } + + return nil + }) } // syncDynamicResourceLifeCycles compares the new DRLC configuration against @@ -409,7 +414,6 @@ func (s *Storage) UpdateAllDynamicResources() error { // be removed. // No dynamic resources are created, deleted, or modified by this function. func (s *Storage) syncDynamicResourceLifeCycles(newDRLCByType, existingDRLCByType map[string]crds.DRLCObject) error { - var finalError error var dRLCToUpdate, dRLCToAdd []crds.DRLCObject for _, existingDRLC := range existingDRLCByType { @@ -436,14 +440,11 @@ func (s *Storage) syncDynamicResourceLifeCycles(newDRLCByType, existingDRLCByTyp } } - if err := s.persistDynamicResourceLifeCycles(dRLCToUpdate, dRLCToAdd, nil); err != nil { - finalError = multierror.Append(finalError, err) - } - return finalError + return s.persistDynamicResourceLifeCycles(dRLCToUpdate, dRLCToAdd, nil) } func (s *Storage) persistResources(resToAdd, resToDelete []crds.ResourceObject, dynamic bool) error { - var finalError error + var errs []error for _, r := range resToDelete { // If currently busy, yield deletion to later cycles. if r.Status.Owner != "" { @@ -456,14 +457,14 @@ func (s *Storage) persistResources(resToAdd, resToDelete []crds.ResourceObject, if r.Status.State == common.Tombstone { logrus.Infof("Deleting resource %s", r.Name) if err := s.DeleteResource(r.Name); err != nil { - finalError = multierror.Append(finalError, err) + errs = append(errs, err) logrus.WithError(err).Errorf("unable to delete resource %s", r.Name) } } else if r.Status.State != common.ToBeDeleted { r.Status.State = common.ToBeDeleted logrus.Infof("Marking resource to be deleted %s", r.Name) if _, err := s.UpdateResource(&r); err != nil { - finalError = multierror.Append(finalError, err) + errs = append(errs, err) logrus.WithError(err).Errorf("unable to update resource %s", r.Name) } } @@ -471,7 +472,7 @@ func (s *Storage) persistResources(resToAdd, resToDelete []crds.ResourceObject, // Static resources can be deleted right away. logrus.Infof("Deleting resource %s", r.Name) if err := s.DeleteResource(r.Name); err != nil { - finalError = multierror.Append(finalError, err) + errs = append(errs, err) logrus.WithError(err).Errorf("unable to delete resource %s", r.Name) } } @@ -481,16 +482,16 @@ func (s *Storage) persistResources(resToAdd, resToDelete []crds.ResourceObject, logrus.Infof("Adding resource %s", r.Name) r.Status.LastUpdate = s.now() if err := s.AddResource(&r); err != nil { - finalError = multierror.Append(finalError, err) + errs = append(errs, err) logrus.WithError(err).Errorf("unable to delete resource %s", r.Name) } } - return finalError + return utilerrors.NewAggregate(errs) } func (s *Storage) persistDynamicResourceLifeCycles(dRLCToUpdate, dRLCToAdd, dRLCToDelelete []crds.DRLCObject) error { - var finalError error + var errs []error remainingTypes := map[string]bool{} updatedResources, err := s.GetResources() if err != nil { @@ -505,7 +506,7 @@ func (s *Storage) persistDynamicResourceLifeCycles(dRLCToUpdate, dRLCToAdd, dRLC if !remainingTypes[dRLC.Name] { logrus.Infof("Deleting resource type life cycle %s", dRLC.Name) if err := s.DeleteDynamicResourceLifeCycle(dRLC.Name); err != nil { - finalError = multierror.Append(finalError, err) + errs = append(errs, err) logrus.WithError(err).Errorf("unable to delete resource type life cycle %s", dRLC.Name) } } else { @@ -519,7 +520,7 @@ func (s *Storage) persistDynamicResourceLifeCycles(dRLCToUpdate, dRLCToAdd, dRLC for _, DRLC := range dRLCToAdd { logrus.Infof("Adding resource type life cycle %s", DRLC.Name) if err := s.AddDynamicResourceLifeCycle(&DRLC); err != nil { - finalError = multierror.Append(finalError, err) + errs = append(errs, err) logrus.WithError(err).Errorf("unable to add resource type life cycle %s", DRLC.Name) } } @@ -527,12 +528,12 @@ func (s *Storage) persistDynamicResourceLifeCycles(dRLCToUpdate, dRLCToAdd, dRLC for _, dRLC := range dRLCToUpdate { logrus.Infof("Updating resource type life cycle %s", dRLC.Name) if _, err := s.UpdateDynamicResourceLifeCycle(&dRLC); err != nil { - finalError = multierror.Append(finalError, err) + errs = append(errs, err) logrus.WithError(err).Errorf("unable to update resource type life cycle %s", dRLC.Name) } } - return finalError + return utilerrors.NewAggregate(errs) } func (s *Storage) syncStaticResources(newResourcesByName, existingResourcesByName map[string]crds.ResourceObject) error { From 6b0ac1ee05263317b8a1dfccb8df330b51f18c16 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Fri, 28 Feb 2020 11:51:18 +0100 Subject: [PATCH 2/2] Boskos handlers: Use onceConflictingClient for tests --- boskos/handlers/BUILD.bazel | 3 +++ boskos/handlers/handlers_test.go | 23 ++++++++++++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/boskos/handlers/BUILD.bazel b/boskos/handlers/BUILD.bazel index dbd63f87226b..2e8d4b0feb2c 100644 --- a/boskos/handlers/BUILD.bazel +++ b/boskos/handlers/BUILD.bazel @@ -27,8 +27,11 @@ go_test( "//boskos/crds:go_default_library", "//boskos/ranch:go_default_library", "@com_github_go_test_deep//:go_default_library", + "@io_k8s_apimachinery//pkg/api/errors:go_default_library", "@io_k8s_apimachinery//pkg/apis/meta/v1:go_default_library", "@io_k8s_apimachinery//pkg/runtime:go_default_library", + "@io_k8s_apimachinery//pkg/runtime/schema:go_default_library", + "@io_k8s_sigs_controller_runtime//pkg/client:go_default_library", "@io_k8s_sigs_controller_runtime//pkg/client/fake:go_default_library", ], ) diff --git a/boskos/handlers/handlers_test.go b/boskos/handlers/handlers_test.go index 5bac47a843d8..e58f561e065d 100644 --- a/boskos/handlers/handlers_test.go +++ b/boskos/handlers/handlers_test.go @@ -18,7 +18,9 @@ package handlers import ( "bytes" + "context" "encoding/json" + "errors" "fmt" "io/ioutil" "net/http" @@ -29,8 +31,11 @@ import ( "testing" "time" + kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" fakectrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" "k8s.io/test-infra/boskos/client" @@ -61,7 +66,8 @@ func MakeTestRanch(resources []runtime.Object) *ranch.Ranch { for _, obj := range resources { obj.(metav1.Object).SetNamespace(ns) } - s := ranch.NewTestingStorage(fakectrlruntimeclient.NewFakeClient(resources...), ns, func() time.Time { return fakeNow }) + client := &onceConflictingClient{Client: fakectrlruntimeclient.NewFakeClient(resources...)} + s := ranch.NewTestingStorage(client, ns, func() time.Time { return fakeNow }) r, _ := ranch.NewRanch("", s, testTTL) return r } @@ -856,3 +862,18 @@ func compareWithFixture(testName string, actualData []byte) error { return nil } + +// onceConflictingClient returns an IsConflict error on the first Update request it receives. It +// is used to verify that there is retrying for conflicts in place. +type onceConflictingClient struct { + didConflict bool + ctrlruntimeclient.Client +} + +func (occ *onceConflictingClient) Update(ctx context.Context, obj runtime.Object, opts ...ctrlruntimeclient.UpdateOption) error { + if !occ.didConflict { + occ.didConflict = true + return kerrors.NewConflict(schema.GroupResource{}, "obj", errors.New("conflicting as requested")) + } + return occ.Client.Update(ctx, obj, opts...) +}