Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

Commit

Permalink
Fixes some of the known issues with disabled resyncs (#88)
Browse files Browse the repository at this point in the history
* clusterclientstore: take resync period as a parameter

We are currently investigating if Shipper works correctly when resyncs
are disabled for #77. Before we can carry on with that investigation,
though, we need to ensure that Shipper does in fact disable all resyncs.
As far as the logs show, this is the only piece that currently doesn't
honor cmd/shipper -resync.

* e2e: improve log output for tests

While investigating some test breakage, I noticed that several functions
in e2e tests were either not outputting enough information to be useful,
or just flat out lying about what they were doing:

- waitForRelease, waitForComplete, and other related code did not inform
which release they were operating on. That makes it difficult to relate
to what's running on kubernetes. Now the release name is being output as
well.

- waitForReleaseStrategyState always said it was waiting for command,
regardless of which waitFor argument was passed. Now it correctly
informs which state it waited for.

* release controller: listen to deletion events

When running e2e tests with `cmd/shipper` started with `-resync=0`, they
fail with the following error:

```
--- FAIL: TestRolloutAbort (73.78s)
    e2e_test.go:751: waiting for release "my-test-app-77cf3473-0" took 1.597339649s
    e2e_test.go:840: waiting for completion of "my-test-app-77cf3473-0" took 18.399953573s
    e2e_test.go:617: waiting for contender release to appear after editing app "my-test-app"
    e2e_test.go:751: waiting for release "my-test-app-3037a9fd-0" took 1.598389769s
    e2e_test.go:624: setting contender release "my-test-app-3037a9fd-0" targetStep to 0
    e2e_test.go:627: waiting for contender release "my-test-app-3037a9fd-0" to achieve waitingForCommand for targetStep 0
    e2e_test.go:761: release strategy state transition: "" -> "release \"my-test-app-3037a9fd-0\" has no achievedStep reported yet"
    e2e_test.go:761: release strategy state transition: "release \"my-test-app-3037a9fd-0\" has no achievedStep reported yet" -> "{installation: False, capacity: False, traffic: False, command: True}"
    e2e_test.go:819: waiting for command took 10.398270118s
    e2e_test.go:633: checking that incumbent "my-test-app-77cf3473-0" has 4 pods and contender "my-test-app-3037a9fd-0" has 1 pods (strategy step 0 -- 100/1)
    e2e_test.go:624: setting contender release "my-test-app-3037a9fd-0" targetStep to 1
    e2e_test.go:627: waiting for contender release "my-test-app-3037a9fd-0" to achieve waitingForCommand for targetStep 1
    e2e_test.go:761: release strategy state transition: "" -> "{installation: False, capacity: True, traffic: False, command: False}"
    e2e_test.go:761: release strategy state transition: "{installation: False, capacity: True, traffic: False, command: False}" -> "{installation: False, capacity: False, traffic: True, command: False}"
    e2e_test.go:761: release strategy state transition: "{installation: False, capacity: False, traffic: True, command: False}" -> "{installation: False, capacity: False, traffic: False, command: True}"
    e2e_test.go:819: waiting for command took 6.397543921s
    e2e_test.go:633: checking that incumbent "my-test-app-77cf3473-0" has 2 pods and contender "my-test-app-3037a9fd-0" has 2 pods (strategy step 1 -- 50/50)
    e2e_test.go:761: release strategy state transition: "" -> "{installation: False, capacity: False, traffic: False, command: False}"
    e2e_test.go:814: timed out waiting for release to be waiting for capacity: waited 30s. final state: {installation: False, capacity: False, traffic: False, command: False}
FAIL
```

This happens when we delete the contender release mid-rollout to perform
a rollback. No event handler gets called in that situation, so the
capacity in the incumbent is never adjusted, so the rollback is "stuck".
Triggering a sync on the Application solves the problem, since the
strategy will be re-evaluated, Shipper will notice that the incumbent
has lower capacity than intended, and it'll scale it up.

Although this didn't show up in the tests, the release controller
suffers from the same issue when listening to all of the target objects.
This commit also addresses those.

This is a small step in the direction of having #77 done.

* capacity controller: retry partially synced capacity targets

We're trying to make Shipper work without resyncs (see #77). After our
Continuous Integration pipeline got configured to actually run without
resyncs in b62a574, we started to see a
lot more flakiness in the E2E tests, and it was impossible to reproduce
on my local environment.

After some added instrumentation in the CI environment, I found this
curious log line:

```
Event(v1.ObjectReference{Kind:"CapacityTarget",
Namespace:"test-new-application-moving-strategy-backwards",
Name:"my-test-app-ce62d141-0",
UID:"5165927d-5abf-11e9-8071-42010a1e0283",
APIVersion:"shipper.booking.com", ResourceVersion:"471", FieldPath:""}):
type: 'Warning' reason: 'FailedCapacityChange' cluster not ready yet for
use; cluster client is being initialized
```

In itself, that's not a problem, since the sync will be retried.

Except it wasn't being retried :) While iterating on a list of clusters,
any failures were just logged and then ignored, presumably waiting for a
resync to fix the problem eventually. Now, if the capacity controller
had issues with any cluster, it'll retry the work on that capacity
target explicitly.

* installation controller: fix log message

This was probably just bad copy paste, but it took me for a ride because
it was lying.

* installation controller: check contender only once

For reasons I couldn't exactly ascertain, we were trying to check if the
InstallationTarget's Release is the current contender release: once by
asking the lister, and once by checking the Application's history.

The first one works alright, but the second one suffers from a race
condition: the application controller would create a release, hand off
work to the release controller to create the installation target, and
only then update the Application resource's history. The second check in
the installation controller would then fail if it executed before the
Application was updated, since the history would be out of date. This
has only worked before because of resyncs, and in the context of testing
shipper with resyncs disabled (see #77) we eventually found this race
condition.

As we have this check twice, this was easy: let's just remove the broken
one :)
  • Loading branch information
juliogreff authored and Oleg Sidorov committed Apr 24, 2019
1 parent 91bc21e commit ab15361
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 35 deletions.
15 changes: 6 additions & 9 deletions cmd/shipper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,20 @@ var controllers = []string{
}

const defaultRESTTimeout time.Duration = 10 * time.Second
const defaultResync time.Duration = 30 * time.Second

var (
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
certPath = flag.String("cert", "", "Path to the TLS certificate for target clusters.")
keyPath = flag.String("key", "", "Path to the TLS private key for target clusters.")
ns = flag.String("namespace", shipper.ShipperNamespace, "Namespace for Shipper resources.")
resyncPeriod = flag.String("resync", "30s", "Informer's cache re-sync in Go's duration format.")
enabledControllers = flag.String("enable", strings.Join(controllers, ","), "comma-seperated list of controllers to run (if not all)")
disabledControllers = flag.String("disable", "", "comma-seperated list of controllers to disable")
workers = flag.Int("workers", 2, "Number of workers to start for each controller.")
metricsAddr = flag.String("metrics-addr", ":8889", "Addr to expose /metrics on.")
chartCacheDir = flag.String("cachedir", filepath.Join(os.TempDir(), "chart-cache"), "location for the local cache of downloaded charts")
resync = flag.Duration("resync", defaultResync, "Informer's cache re-sync in Go's duration format.")
restTimeout = flag.Duration("rest-timeout", defaultRESTTimeout, "Timeout value for management and target REST clients. Does not affect informer watches.")
)

Expand All @@ -87,7 +88,7 @@ type cfg struct {

kubeInformerFactory informers.SharedInformerFactory
shipperInformerFactory shipperinformers.SharedInformerFactory
resync time.Duration
resync *time.Duration

recorder func(string) record.EventRecorder

Expand All @@ -107,11 +108,6 @@ type cfg struct {
func main() {
flag.Parse()

resync, err := time.ParseDuration(*resyncPeriod)
if err != nil {
glog.Fatal(err)
}

baseRestCfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig)
if err != nil {
glog.Fatal(err)
Expand All @@ -126,8 +122,8 @@ func main() {
stopCh := setupSignalHandler()
metricsReadyCh := make(chan struct{})

kubeInformerFactory := informers.NewSharedInformerFactory(informerKubeClient, resync)
shipperInformerFactory := shipperinformers.NewSharedInformerFactory(informerShipperClient, resync)
kubeInformerFactory := informers.NewSharedInformerFactory(informerKubeClient, *resync)
shipperInformerFactory := shipperinformers.NewSharedInformerFactory(informerShipperClient, *resync)

shipperscheme.AddToScheme(scheme.Scheme)

Expand Down Expand Up @@ -173,6 +169,7 @@ func main() {
shipperInformerFactory.Shipper().V1alpha1().Clusters(),
*ns,
restTimeout,
resync,
)

wg.Add(1)
Expand Down
13 changes: 10 additions & 3 deletions pkg/clusterclientstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Store struct {
ns string
buildClient ClientBuilderFunc
restTimeout *time.Duration
resync *time.Duration
cache cache.CacheServer

secretInformer corev1informer.SecretInformer
Expand All @@ -55,12 +56,14 @@ func NewStore(
secretInformer corev1informer.SecretInformer,
clusterInformer shipperinformer.ClusterInformer,
ns string,
restTimeout *time.Duration,
restTimeout,
resync *time.Duration,
) *Store {
s := &Store{
ns: ns,
buildClient: buildClient,
restTimeout: restTimeout,
resync: resync,
cache: cache.NewServer(),

secretInformer: secretInformer,
Expand Down Expand Up @@ -277,8 +280,12 @@ func (s *Store) create(cluster *shipper.Cluster, secret *corev1.Secret) error {
return fmt.Errorf("create informer client for Cluster %q: %s", cluster.Name, err)
}

// TODO(asurikov): propagate -resync from cmd/shipper here.
informerFactory := kubeinformers.NewSharedInformerFactory(informerClient, time.Second*30)
var resyncPeriod time.Duration
if s.resync != nil {
resyncPeriod = *s.resync
}

informerFactory := kubeinformers.NewSharedInformerFactory(informerClient, resyncPeriod)
// Register all the resources that the controllers are interested in, e.g.
// informerFactory.Core().V1().Pods().Informer().
for _, cb := range s.subscriptionRegisterFuncs {
Expand Down
3 changes: 2 additions & 1 deletion pkg/clusterclientstore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (f *fixture) newStore() (*Store, kubeinformers.SharedInformerFactory, shipp
f.kubeClient = kubefake.NewSimpleClientset(f.kubeObjects...)
f.shipperClient = shipperfake.NewSimpleClientset(f.shipperObjects...)

const noResyncPeriod time.Duration = 0
noResyncPeriod := time.Duration(0)
shipperInformerFactory := shipperinformers.NewSharedInformerFactory(f.shipperClient, noResyncPeriod)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(f.kubeClient, noResyncPeriod)

Expand All @@ -275,6 +275,7 @@ func (f *fixture) newStore() (*Store, kubeinformers.SharedInformerFactory, shipp
shipperInformerFactory.Shipper().V1alpha1().Clusters(),
shipper.ShipperNamespace,
f.restTimeout,
&noResyncPeriod,
)

return store, kubeInformerFactory, shipperInformerFactory
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/capacity/capacity_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (c *Controller) capacityTargetSyncHandler(key string) bool {

ct = ct.DeepCopy()

shouldRetry := false
targetNamespace := ct.Namespace
selector := labels.Set(ct.Labels).AsSelector()

Expand Down Expand Up @@ -216,6 +217,7 @@ func (c *Controller) capacityTargetSyncHandler(key string) bool {
targetDeployment, err := c.findTargetDeploymentForClusterSpec(clusterSpec, targetNamespace, selector, clusterStatus)
if err != nil {
c.recordErrorEvent(ct, err)
shouldRetry = true
continue
}

Expand All @@ -228,6 +230,7 @@ func (c *Controller) capacityTargetSyncHandler(key string) bool {
_, err = c.patchDeploymentWithReplicaCount(targetDeployment, clusterSpec.Name, replicaCount, clusterStatus)
if err != nil {
c.recordErrorEvent(ct, err)
shouldRetry = true
continue
}
}
Expand Down Expand Up @@ -274,7 +277,7 @@ func (c *Controller) capacityTargetSyncHandler(key string) bool {
return true
}

return false
return shouldRetry
}

func (c *Controller) enqueueCapacityTarget(obj interface{}) {
Expand Down
14 changes: 1 addition & 13 deletions pkg/controller/installation/installation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (c *Controller) processNextWorkItem() bool {
if c.workqueue.NumRequeues(key) >= maxRetries {
// Drop the InstallationTarget's key out of the workqueue and thus reset its
// backoff. This limits the time a "broken" object can hog a worker.
glog.Warningf("CapacityTarget %q has been retried too many times, dropping from the queue", key)
glog.Warningf("InstallationTarget %q has been retried too many times, dropping from the queue", key)
c.workqueue.Forget(key)

return true
Expand Down Expand Up @@ -236,18 +236,6 @@ func (c *Controller) processInstallation(it *shipper.InstallationTarget) error {
return nil
}

if appLabelValue, ok := release.GetLabels()[shipper.AppLabel]; !ok {
// TODO(isutton): Transform this into a real error
return fmt.Errorf("couldn't find label %q in release %q", shipper.AppLabel, release.GetName())
} else if app, appListerErr := c.appLister.Applications(release.GetNamespace()).Get(appLabelValue); appListerErr != nil {
// TODO(isutton): wrap this error.
return appListerErr
} else if len(app.Status.History) > 0 && app.Status.History[len(app.Status.History)-1] != release.GetName() {
// Current release isn't the contender, so we do not attempt to
// create or modify objects at all.
return nil
}

installer := NewInstaller(c.chartFetchFunc, release, it)

// Build .status over based on the current .spec.clusters.
Expand Down
20 changes: 20 additions & 0 deletions pkg/controller/release/release_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func NewController(
UpdateFunc: func(oldObj, newObj interface{}) {
controller.enqueueRelease(newObj)
},
DeleteFunc: controller.enqueueAppFromRelease,
})

installationTargetInformer.Informer().AddEventHandler(
Expand All @@ -146,6 +147,7 @@ func NewController(
UpdateFunc: func(oldObj, newObj interface{}) {
controller.enqueueInstallationTarget(newObj)
},
DeleteFunc: controller.enqueueInstallationTarget,
})

capacityTargetInformer.Informer().AddEventHandler(
Expand All @@ -154,6 +156,7 @@ func NewController(
UpdateFunc: func(oldObj, newObj interface{}) {
controller.enqueueCapacityTarget(newObj)
},
DeleteFunc: controller.enqueueCapacityTarget,
})

trafficTargetInformer.Informer().AddEventHandler(
Expand All @@ -162,6 +165,7 @@ func NewController(
UpdateFunc: func(oldObj, newObj interface{}) {
controller.enqueueTrafficTarget(newObj)
},
DeleteFunc: controller.enqueueTrafficTarget,
})

return controller
Expand Down Expand Up @@ -444,6 +448,22 @@ func (c *Controller) enqueueRelease(obj interface{}) {
c.releaseWorkqueue.Add(key)
}

func (c *Controller) enqueueAppFromRelease(obj interface{}) {
rel, ok := obj.(*shipper.Release)
if !ok {
runtime.HandleError(fmt.Errorf("not a shipper.Release: %#v", obj))
return
}

appName, err := c.getAssociatedApplicationKey(rel)
if err != nil {
runtime.HandleError(fmt.Errorf("error fetching Application key for release %v: %s", rel, err))
return
}

c.applicationWorkqueue.Add(appName)
}

func (c *Controller) enqueueInstallationTarget(obj interface{}) {
it, ok := obj.(*shipper.InstallationTarget)
if !ok {
Expand Down
1 change: 1 addition & 0 deletions pkg/testing/clusterclientstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func ClusterClientStore(
shipperInformerFactory.Shipper().V1alpha1().Clusters(),
TestNamespace,
nil,
nil,
)

kubeInformerFactory.Start(stopCh)
Expand Down
16 changes: 8 additions & 8 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,10 +621,10 @@ func TestRolloutAbort(t *testing.T) {
// The strategy emulates deployment all way down to 50/50 (steps 0 and 1)
for _, i := range []int{0, 1} {
step := vanguard.Steps[i]
t.Logf("setting release %q targetStep to %d", contenderName, i)
t.Logf("setting contender release %q targetStep to %d", contenderName, i)
f.targetStep(i, contenderName)

t.Logf("waiting for release %q to achieve waitingForCommand for targetStep %d", contenderName, i)
t.Logf("waiting for contender release %q to achieve waitingForCommand for targetStep %d", contenderName, i)
f.waitForReleaseStrategyState("command", contenderName, i)

expectedContenderCapacity := replicas.CalculateDesiredReplicaCount(uint(step.Capacity.Contender), float64(targetReplicas))
Expand All @@ -645,7 +645,7 @@ func TestRolloutAbort(t *testing.T) {
}

// The test emulates an interruption in the middle of the rollout, which
// means the incumbant becomes a new contender and it will stay in 50%
// means the incumbent becomes a new contender and it will stay in 50%
// capacity state (step 1 according to the vanguard definition) for a bit
// until shipper detects the need for capacity and spins up the missing
// pods
Expand Down Expand Up @@ -748,7 +748,7 @@ func (f *fixture) waitForRelease(appName string, historyIndex int) *shipper.Rele
f.t.Fatalf("error waiting for release to be scheduled: %q", err)
}

f.t.Logf("waiting for release took %s", time.Since(start))
f.t.Logf("waiting for release %q took %s", newRelease.Name, time.Since(start))
return newRelease
}

Expand Down Expand Up @@ -811,12 +811,12 @@ func (f *fixture) waitForReleaseStrategyState(waitingFor string, releaseName str

if err != nil {
if err == wait.ErrWaitTimeout {
f.t.Fatalf("timed out waiting for release to be 'waitingForCommand': waited %s. final state: %s", globalTimeout, state)
f.t.Fatalf("timed out waiting for release to be waiting for %s: waited %s. final state: %s", waitingFor, globalTimeout, state)
}
f.t.Fatalf("error waiting for release to be 'waitingForCommand': %q", err)
f.t.Fatalf("error waiting for release to be waiting for %s: %q", waitingFor, err)
}

f.t.Logf("waiting for command took %s", time.Since(start))
f.t.Logf("waiting for %s took %s", waitingFor, time.Since(start))
}

func (f *fixture) waitForComplete(releaseName string) {
Expand All @@ -837,7 +837,7 @@ func (f *fixture) waitForComplete(releaseName string) {
if err != nil {
f.t.Fatalf("error waiting for release to complete: %q", err)
}
f.t.Logf("waiting for completion took %s", time.Since(start))
f.t.Logf("waiting for completion of %q took %s", releaseName, time.Since(start))
}

func poll(timeout time.Duration, waitCondition func() (bool, error)) error {
Expand Down

0 comments on commit ab15361

Please sign in to comment.