Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cleanup policy to RollingUpdater #6996

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/kubectl/cmd/rollingupdate.go
Expand Up @@ -134,7 +134,15 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
if newRc.Spec.Replicas == 0 {
newRc.Spec.Replicas = oldRc.Spec.Replicas
}
err = updater.Update(out, oldRc, newRc, period, interval, timeout)
err = updater.Update(&kubectl.RollingUpdaterConfig{
Out: out,
OldRc: oldRc,
NewRc: newRc,
UpdatePeriod: period,
Interval: interval,
Timeout: timeout,
CleanupPolicy: kubectl.DeleteRollingUpdateCleanupPolicy,
})
if err != nil {
return err
}
Expand Down
77 changes: 61 additions & 16 deletions pkg/kubectl/rolling_updater.go
Expand Up @@ -36,6 +36,38 @@ type RollingUpdater struct {
ns string
}

// RollingUpdaterConfig is the configuration for a rolling deployment process.
type RollingUpdaterConfig struct {
// Out is a writer for progress output.
Out io.Writer
// OldRC is an existing controller to be replaced.
OldRc *api.ReplicationController
// NewRc is a controller that will take ownership of updated pods (will be
// created if needed).
NewRc *api.ReplicationController
// UpdatePeriod is the time to wait between individual pod updates.
UpdatePeriod time.Duration
// Interval is the time to wait between polling controller status after
// update.
Interval time.Duration
// Timeout is the time to wait for controller updates before giving up.
Timeout time.Duration
// CleanupPolicy defines the cleanup action to take after the deployment is
// complete.
CleanupPolicy RollingUpdaterCleanupPolicy
}

// RollingUpdaterCleanupPolicy is a cleanup action to take after the
// deployment is complete.
type RollingUpdaterCleanupPolicy string

const (
// DeleteRollingUpdateCleanupPolicy means delete the old controller.
DeleteRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Delete"
// PreserveRollingUpdateCleanupPolicy means keep the old controller.
PreserveRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Preserve"
)

// NewRollingUpdater creates a RollingUpdater from a client
func NewRollingUpdater(namespace string, c RollingUpdaterClient) *RollingUpdater {
return &RollingUpdater{
Expand All @@ -49,20 +81,25 @@ const (
desiredReplicasAnnotation = kubectlAnnotationPrefix + "desired-replicas"
)

// Update all pods for a ReplicationController (oldRc) by creating a new controller (newRc)
// with 0 replicas, and synchronously resizing oldRc,newRc by 1 until oldRc has 0 replicas
// and newRc has the original # of desired replicas. oldRc is then deleted.
// If an update from newRc to oldRc is already in progress, we attempt to drive it to completion.
// If an error occurs at any step of the update, the error will be returned.
// 'out' writer for progress output
// 'oldRc' existing controller to be replaced
// 'newRc' controller that will take ownership of updated pods (will be created if needed)
// 'updatePeriod' time to wait between individual pod updates
// 'interval' time to wait between polling controller status after update
// 'timeout' time to wait for controller updates before giving up
// Update all pods for a ReplicationController (oldRc) by creating a new
// controller (newRc) with 0 replicas, and synchronously resizing oldRc,newRc
// by 1 until oldRc has 0 replicas and newRc has the original # of desired
// replicas. Cleanup occurs based on a RollingUpdaterCleanupPolicy.
//
// If an update from newRc to oldRc is already in progress, we attempt to
// drive it to completion. If an error occurs at any step of the update, the
// error will be returned.
//
// TODO: make this handle performing a rollback of a partially completed rollout.
func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationController, updatePeriod, interval, timeout time.Duration) error {
// TODO: make this handle performing a rollback of a partially completed
// rollout.
func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
out := config.Out
oldRc := config.OldRc
newRc := config.NewRc
updatePeriod := config.UpdatePeriod
interval := config.Interval
timeout := config.Timeout

oldName := oldRc.ObjectMeta.Name
newName := newRc.ObjectMeta.Name
retry := &RetryParams{interval, timeout}
Expand Down Expand Up @@ -156,9 +193,17 @@ func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationCont
if err != nil {
return err
}
// delete old rc
fmt.Fprintf(out, "Update succeeded. Deleting %s\n", oldName)
return r.c.DeleteReplicationController(r.ns, oldName)

switch config.CleanupPolicy {
case DeleteRollingUpdateCleanupPolicy:
// delete old rc
fmt.Fprintf(out, "Update succeeded. Deleting %s\n", oldName)
return r.c.DeleteReplicationController(r.ns, oldName)
case PreserveRollingUpdateCleanupPolicy:
return nil
default:
return nil
}
}

func (r *RollingUpdater) getExistingNewRc(sourceId, name string) (rc *api.ReplicationController, existing bool, err error) {
Expand Down
106 changes: 104 additions & 2 deletions pkg/kubectl/rolling_updater_test.go
Expand Up @@ -19,12 +19,14 @@ package kubectl
import (
"bytes"
"fmt"
"io/ioutil"
"testing"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
)

type updaterFake struct {
Expand Down Expand Up @@ -257,7 +259,16 @@ Update succeeded. Deleting foo-v1
"default",
}
var buffer bytes.Buffer
if err := updater.Update(&buffer, test.oldRc, test.newRc, 0, time.Millisecond, time.Millisecond); err != nil {
config := &RollingUpdaterConfig{
Out: &buffer,
OldRc: test.oldRc,
NewRc: test.newRc,
UpdatePeriod: 0,
Interval: time.Millisecond,
Timeout: time.Millisecond,
CleanupPolicy: DeleteRollingUpdateCleanupPolicy,
}
if err := updater.Update(config); err != nil {
t.Errorf("Update failed: %v", err)
}
if buffer.String() != test.output {
Expand Down Expand Up @@ -299,10 +310,101 @@ Update succeeded. Deleting foo-v1
updater := RollingUpdater{NewRollingUpdaterClient(fakeClientFor("default", responses)), "default"}

var buffer bytes.Buffer
if err := updater.Update(&buffer, rc, rcExisting, 0, time.Millisecond, time.Millisecond); err != nil {
config := &RollingUpdaterConfig{
Out: &buffer,
OldRc: rc,
NewRc: rcExisting,
UpdatePeriod: 0,
Interval: time.Millisecond,
Timeout: time.Millisecond,
CleanupPolicy: DeleteRollingUpdateCleanupPolicy,
}
if err := updater.Update(config); err != nil {
t.Errorf("Update failed: %v", err)
}
if buffer.String() != output {
t.Errorf("Output was not as expected. Expected:\n%s\nGot:\n%s", output, buffer.String())
}
}

// TestRollingUpdater_preserveCleanup ensures that the old controller isn't
// deleted following a successful deployment.
func TestRollingUpdater_preserveCleanup(t *testing.T) {
rc := oldRc(2)
rcExisting := newRc(1, 3)

updater := &RollingUpdater{
ns: "default",
c: &rollingUpdaterClientImpl{
GetReplicationControllerFn: func(namespace, name string) (*api.ReplicationController, error) {
switch name {
case rc.Name:
return rc, nil
case rcExisting.Name:
return rcExisting, nil
default:
return nil, fmt.Errorf("unexpected get call for %s/%s", namespace, name)
}
},
UpdateReplicationControllerFn: func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) {
return rc, nil
},
CreateReplicationControllerFn: func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) {
t.Fatalf("unexpected call to create %s/rc:%#v", namespace, rc)
return nil, nil
},
DeleteReplicationControllerFn: func(namespace, name string) error {
t.Fatalf("unexpected call to delete %s/%s", namespace, name)
return nil
},
ControllerHasDesiredReplicasFn: func(rc *api.ReplicationController) wait.ConditionFunc {
return func() (done bool, err error) {
return true, nil
}
},
},
}

config := &RollingUpdaterConfig{
Out: ioutil.Discard,
OldRc: rc,
NewRc: rcExisting,
UpdatePeriod: 0,
Interval: time.Millisecond,
Timeout: time.Millisecond,
CleanupPolicy: PreserveRollingUpdateCleanupPolicy,
}
err := updater.Update(config)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}

// rollingUpdaterClientImpl is a dynamic RollingUpdaterClient.
type rollingUpdaterClientImpl struct {
GetReplicationControllerFn func(namespace, name string) (*api.ReplicationController, error)
UpdateReplicationControllerFn func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error)
CreateReplicationControllerFn func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error)
DeleteReplicationControllerFn func(namespace, name string) error
ControllerHasDesiredReplicasFn func(rc *api.ReplicationController) wait.ConditionFunc
}

func (c *rollingUpdaterClientImpl) GetReplicationController(namespace, name string) (*api.ReplicationController, error) {
return c.GetReplicationControllerFn(namespace, name)
}

func (c *rollingUpdaterClientImpl) UpdateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) {
return c.UpdateReplicationControllerFn(namespace, rc)
}

func (c *rollingUpdaterClientImpl) CreateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) {
return c.CreateReplicationControllerFn(namespace, rc)
}

func (c *rollingUpdaterClientImpl) DeleteReplicationController(namespace, name string) error {
return c.DeleteReplicationControllerFn(namespace, name)
}

func (c *rollingUpdaterClientImpl) ControllerHasDesiredReplicas(rc *api.ReplicationController) wait.ConditionFunc {
return c.ControllerHasDesiredReplicasFn(rc)
}