Skip to content
Merged
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## main / unreleased

* [ENHANCEMENT] Check zone-aware PodDisruptionBudget compliance before deleting Pods during rolling updates. #324

## v0.31.1

* [ENHANCEMENT] Updated dependencies, including: #314, #315
Expand Down
11 changes: 6 additions & 5 deletions cmd/rollout-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,17 @@ func main() {
// watches for validating webhooks being added - this is only started if the TLS server is started
webhookObserver := tlscert.NewWebhookObserver(kubeClient, cfg.kubeNamespace, logger)

// controller for pod eviction - this is only started if the TLS server is started
// Controller for pod eviction.
// If the TLS server is started below (webhooks registered), then this controller will handle the validating webhook requests
// for pod evictions and zpdb configuration changes. If the webhooks are not enabled, this controller is still started
// and will be used by the main controller to assist in validating pod deletion requests.
evictionController := zpdb.NewEvictionController(kubeClient, dynamicClient, cfg.kubeNamespace, logger)
check(evictionController.Start())

maybeStartTLSServer(cfg, httpRT, logger, kubeClient, restart, metrics, evictionController, webhookObserver)

// Init the controller
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeClusterDomain, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger)
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeClusterDomain, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger, evictionController)
check(errors.Wrap(c.Init(), "failed to init controller"))

// Listen to sigterm, as well as for restart (like for certificate renewal).
Expand Down Expand Up @@ -294,9 +298,6 @@ func maybeStartTLSServer(cfg config, rt http.RoundTripper, logger log.Logger, ku
check(webhookObserver.Init(webHookListener))
}

// Start monitoring for zpdb configurations and pods
check(evictionController.Start())

prepDownscaleAdmitFunc := func(ctx context.Context, logger log.Logger, ar v1.AdmissionReview, api *kubernetes.Clientset) *v1.AdmissionResponse {
return admission.PrepareDownscale(ctx, rt, logger, ar, api, cfg.useZoneTracker, cfg.zoneTrackerConfigMapName, cfg.kubeClusterDomain)
}
Expand Down
68 changes: 68 additions & 0 deletions integration/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,74 @@ func TestRolloutHappyCase(t *testing.T) {
requireEventuallyPod(t, api, ctx, "mock-zone-c-0", expectReady(), expectVersion("2"))
}

// TestRolloutHappyCaseWithZpdb performs pod updates via the rolling update controller and uses the zpdb to determine if the pod delete is safe or not
func TestRolloutHappyCaseWithZpdb(t *testing.T) {
ctx := context.Background()

cluster := k3t.NewCluster(ctx, t, k3t.WithImages("rollout-operator:latest", "mock-service:latest"))
api := cluster.API()

path := initManifestFiles(t, "max-unavailable-1")

{
t.Log("Create rollout operator and check it's running and ready.")
createRolloutOperator(t, ctx, api, cluster.ExtAPI(), path, true)

_ = createValidatingWebhookConfiguration(t, api, ctx, path+yamlWebhookZpdbValidation)
_ = createValidatingWebhookConfiguration(t, api, ctx, path+yamlWebhookPodEviction)

rolloutOperatorPod := eventuallyGetFirstPod(ctx, t, api, "name=rollout-operator")
requireEventuallyPod(t, api, ctx, rolloutOperatorPod, expectPodPhase(corev1.PodRunning), expectReady())

t.Log("Await CABundle assignment")
require.Eventually(t, awaitCABundleAssignment(2, ctx, api), time.Second*30, time.Millisecond*10, "New webhooks have CABundle added")
}

{
t.Log("Create a valid zpdb configuration.")
awaitZoneAwarePodDisruptionBudgetCreation(t, ctx, cluster, path+yamlZpdbConfig)
}

// Create mock service, and check that it is in the desired state.
createMockServiceZone(t, ctx, api, corev1.NamespaceDefault, "mock-zone-a", 1)
createMockServiceZone(t, ctx, api, corev1.NamespaceDefault, "mock-zone-b", 1)
createMockServiceZone(t, ctx, api, corev1.NamespaceDefault, "mock-zone-c", 1)
requireEventuallyPod(t, api, ctx, "mock-zone-a-0", expectPodPhase(corev1.PodRunning), expectReady(), expectVersion("1"))
requireEventuallyPod(t, api, ctx, "mock-zone-b-0", expectPodPhase(corev1.PodRunning), expectReady(), expectVersion("1"))
requireEventuallyPod(t, api, ctx, "mock-zone-c-0", expectPodPhase(corev1.PodRunning), expectReady(), expectVersion("1"))

// Update all mock service statefulsets.
_, err := api.AppsV1().StatefulSets(corev1.NamespaceDefault).Update(ctx, mockServiceStatefulSet("mock-zone-a", "2", false, 1), metav1.UpdateOptions{})
require.NoError(t, err, "Can't update StatefulSet")
_, err = api.AppsV1().StatefulSets(corev1.NamespaceDefault).Update(ctx, mockServiceStatefulSet("mock-zone-b", "2", false, 1), metav1.UpdateOptions{})
require.NoError(t, err, "Can't update StatefulSet")
_, err = api.AppsV1().StatefulSets(corev1.NamespaceDefault).Update(ctx, mockServiceStatefulSet("mock-zone-c", "2", false, 1), metav1.UpdateOptions{})
require.NoError(t, err, "Can't update StatefulSet")

// First pod should be now version 2 and not be ready, the rest should be ready yet.
requireEventuallyPod(t, api, ctx, "mock-zone-a-0", expectNotReady(), expectVersion("2"))
requireEventuallyPod(t, api, ctx, "mock-zone-b-0", expectReady(), expectVersion("1"))
requireEventuallyPod(t, api, ctx, "mock-zone-c-0", expectReady(), expectVersion("1"))

// zone-a becomes ready, zone-b should become not ready and be version 2.
makeMockReady(t, cluster, "mock-zone-a")
requireEventuallyPod(t, api, ctx, "mock-zone-a-0", expectReady(), expectVersion("2"))
requireEventuallyPod(t, api, ctx, "mock-zone-b-0", expectNotReady(), expectVersion("2"))
requireEventuallyPod(t, api, ctx, "mock-zone-c-0", expectReady(), expectVersion("1"))

// zone-b becomes ready, zone-c should become not ready and be version 2.
makeMockReady(t, cluster, "mock-zone-b")
requireEventuallyPod(t, api, ctx, "mock-zone-a-0", expectReady(), expectVersion("2"))
requireEventuallyPod(t, api, ctx, "mock-zone-b-0", expectReady(), expectVersion("2"))
requireEventuallyPod(t, api, ctx, "mock-zone-c-0", expectNotReady(), expectVersion("2"))

// zone-c becomes ready, final state.
makeMockReady(t, cluster, "mock-zone-c")
requireEventuallyPod(t, api, ctx, "mock-zone-a-0", expectReady(), expectVersion("2"))
requireEventuallyPod(t, api, ctx, "mock-zone-b-0", expectReady(), expectVersion("2"))
requireEventuallyPod(t, api, ctx, "mock-zone-c-0", expectReady(), expectVersion("2"))
}

func awaitCABundleAssignment(webhookCnt int, ctx context.Context, api *kubernetes.Clientset) func() bool {
return func() bool {

Expand Down
3 changes: 2 additions & 1 deletion integration/manifests_rollout_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ func createRolloutOperatorDependencies(t *testing.T, ctx context.Context, api *k
_, err = api.RbacV1().RoleBindings(corev1.NamespaceDefault).Create(ctx, roleBinding, metav1.CreateOptions{})
require.NoError(t, err)

_ = createZoneAwarePodDistruptionBudgetCustomResourceDefinition(t, extApi)

if webhook {
_ = createZoneAwarePodDistruptionBudgetCustomResourceDefinition(t, extApi)
_ = createReplicaTemplateCustomResourceDefinition(t, extApi)

operatorRole := loadFromDisk[rbacv1.Role](t, directory+yamlRoleSecret, &rbacv1.Role{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ rules:
- get
- update
- create
- apiGroups:
- rollout-operator.grafana.com
resources:
- zoneawarepoddisruptionbudgets
verbs:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down
15 changes: 7 additions & 8 deletions operations/rollout-operator/rollout-operator.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
assert !$._config.ignore_rollout_operator_prepare_downscale_webhook_failures || $._config.rollout_operator_webhooks_enabled : 'ignore_rollout_operator_prepare_downscale_webhook_failures requires rollout_operator_webhooks_enabled=true',
assert !$._config.ignore_rollout_operator_zpdb_eviction_webhook_failures || $._config.rollout_operator_webhooks_enabled : 'ignore_rollout_operator_zpdb_eviction_webhook_failures requires rollout_operator_webhooks_enabled=true',
assert !$._config.ignore_rollout_operator_zpdb_validation_webhook_failures || $._config.rollout_operator_webhooks_enabled : 'ignore_rollout_operator_zpdb_validation_webhook_failures requires rollout_operator_webhooks_enabled=true',
assert !$._config.zpdb_custom_resource_definition_enabled || $._config.rollout_operator_webhooks_enabled : 'zpdb_custom_resource_definition_enabled requires rollout_operator_webhooks_enabled=true',
assert !$._config.replica_template_custom_resource_definition_enabled || $._config.rollout_operator_webhooks_enabled : 'replica_template_custom_resource_definition_enabled requires rollout_operator_webhooks_enabled=true',

local enableWebhooks = $._config.rollout_operator_replica_template_access_enabled || $._config.rollout_operator_webhooks_enabled,
Expand Down Expand Up @@ -141,13 +140,13 @@
policyRule.withResources(['%s/scale' % $.replica_template.spec.names.plural, '%s/status' % $.replica_template.spec.names.plural]) +
policyRule.withVerbs(['get', 'patch']),
] else []
) + (
if enableWebhooks then [
policyRule.withApiGroups($.zpdb_template.spec.group) +
policyRule.withResources([$.zpdb_template.spec.names.plural]) +
policyRule.withVerbs(['get', 'list', 'watch']),
] else []
)
) +
[
policyRule.withApiGroups($.zpdb_template.spec.group) +
policyRule.withResources([$.zpdb_template.spec.names.plural]) +
policyRule.withVerbs(['get', 'list', 'watch']),
]

),

rollout_operator_rolebinding: if !$._config.rollout_operator_enabled then null else
Expand Down
28 changes: 26 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}

type ZPDBEvictionController interface {
MarkPodAsDeleted(ctx context.Context, namespace string, podName string, source string) error
}

type RolloutController struct {
kubeClient kubernetes.Interface
clusterDomain string
Expand All @@ -63,6 +67,8 @@ type RolloutController struct {
httpClient httpClient
logger log.Logger

zpdbController ZPDBEvictionController

// This bool is true if we should trigger a reconcile.
shouldReconcile atomic.Bool

Expand All @@ -80,7 +86,7 @@ type RolloutController struct {
discoveredGroups map[string]struct{}
}

func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, clusterDomain string, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, clusterDomain string, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger, zpdbController ZPDBEvictionController) *RolloutController {
namespaceOpt := informers.WithNamespace(namespace)

// Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones
Expand Down Expand Up @@ -110,6 +116,7 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
scaleClient: scaleClient,
dynamicClient: dynamic,
httpClient: client,
zpdbController: zpdbController,
logger: logger,
stopCh: make(chan struct{}),
discoveredGroups: map[string]struct{}{},
Expand Down Expand Up @@ -575,7 +582,24 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
continue
}

level.Info(c.logger).Log("msg", fmt.Sprintf("terminating pod %s", pod.Name))
// Use the ZPDB to determine if this pod delete is allowed.
// The ZPDB serializes requests from this controller and from any incoming voluntary evictions.
// For each request, a full set of tests is performed to confirm the state of all pods in the ZPDB scope.
// This ensures that if any voluntary evictions have been allowed since this reconcile loop commenced that
// the latest information is used to determine the zone/partition disruption level.
// If this request returns without error, the pod will be placed into the ZPDB pod eviction cache and will
// be considered as not ready until it either restarts or is expired from the cache.
err := c.zpdbController.MarkPodAsDeleted(ctx, pod.Namespace, pod.Name, "rollout-controller")
if err != nil {
// Skip this pod. The reconcile loop regularly runs and this pod will have an opportunity to be re-tried.
// Rather than returning false here and abandoning the rest of the updates until the next reconcile,
// we allow this update loop to continue. For configurations which have a partition aware ZPDB it is valid
// to have multiple disruptions as long as there is at least one healthy pod per partition.
level.Debug(c.logger).Log("msg", "zpdb denied pod deletion", "pod", pod.Name, "reason", err)
continue
}

level.Info(c.logger).Log("msg", "terminating pod (does not violate any relevant ZPDBs)", "pod", pod.Name)
if err := c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
return false, errors.Wrapf(err, "failed to delete pod %s", pod.Name)
}
Expand Down
57 changes: 53 additions & 4 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestRolloutController_Reconcile(t *testing.T) {
expectedPatchedSets map[string][]string
expectedPatchedResources map[string][]string
expectedErr string
zpdbErrors []error
}{
"should return error if some StatefulSet don't have OnDelete update strategy": {
statefulSets: []runtime.Object{
Expand Down Expand Up @@ -147,6 +148,38 @@ func TestRolloutController_Reconcile(t *testing.T) {
},
expectedDeletedPods: []string{"ingester-zone-b-0", "ingester-zone-b-1"},
},
"should delete pods that needs to be updated - zpdb allows first delete but denies the second": {
statefulSets: []runtime.Object{
mockStatefulSet("ingester-zone-a"),
mockStatefulSet("ingester-zone-b", withPrevRevision()),
},
pods: []runtime.Object{
mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-a-2", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-b-0", testPrevRevisionHash),
mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash),
mockStatefulSetPod("ingester-zone-b-2", testPrevRevisionHash),
},
expectedDeletedPods: []string{"ingester-zone-b-0"},
zpdbErrors: []error{nil, errors.New("zpdb denies eviction request")},
},
"should delete pods that needs to be updated - zpdb denies first delete but allows the second": {
statefulSets: []runtime.Object{
mockStatefulSet("ingester-zone-a"),
mockStatefulSet("ingester-zone-b", withPrevRevision()),
},
pods: []runtime.Object{
mockStatefulSetPod("ingester-zone-a-0", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-a-1", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-a-2", testLastRevisionHash),
mockStatefulSetPod("ingester-zone-b-0", testPrevRevisionHash),
mockStatefulSetPod("ingester-zone-b-1", testPrevRevisionHash),
mockStatefulSetPod("ingester-zone-b-2", testPrevRevisionHash),
},
expectedDeletedPods: []string{"ingester-zone-b-1"},
zpdbErrors: []error{errors.New("zpdb denies eviction request")},
},
"should default max unavailable to 1 if set to an invalid value": {
statefulSets: []runtime.Object{
mockStatefulSet("ingester-zone-a", withPrevRevision(), func(sts *v1.StatefulSet) {
Expand Down Expand Up @@ -617,7 +650,11 @@ func TestRolloutController_Reconcile(t *testing.T) {

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testClusterDomain, testNamespace, nil, 5*time.Second, reg, log.NewNopLogger())

// Pass in a slice of errors. Each eviction request takes and removes from the head and uses this as the eviction response. Once exhausted no evictions will return an error.
zpdb := &mockEvictionController{nextErrorsIfAny: testData.zpdbErrors}

c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testClusterDomain, testNamespace, nil, 5*time.Second, reg, log.NewNopLogger(), zpdb)
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -926,7 +963,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T)

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testClusterDomain, testNamespace, httpClient, 5*time.Second, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testClusterDomain, testNamespace, httpClient, 5*time.Second, reg, log.NewNopLogger(), &mockEvictionController{})
require.NoError(t, c.Init())
defer c.Stop()

Expand All @@ -942,7 +979,6 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T)

func createFakeDynamicClient() (*fakedynamic.FakeDynamicClient, map[string][]string) {
dynamicClient := &fakedynamic.FakeDynamicClient{}

patchedStatuses := map[string][]string{}
dynamicClient.AddReactor("patch", "*", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
patchAction := action.(ktesting.PatchAction)
Expand Down Expand Up @@ -1029,7 +1065,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, nil, nil, nil, testClusterDomain, testNamespace, nil, 5*time.Second, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, nil, nil, nil, testClusterDomain, testNamespace, nil, 5*time.Second, reg, log.NewNopLogger(), &mockEvictionController{})
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -1254,3 +1290,16 @@ func (f *fakeHttpClient) requests() []string {

return f.recordedRequests
}

type mockEvictionController struct {
nextErrorsIfAny []error
}

func (m *mockEvictionController) MarkPodAsDeleted(ctx context.Context, namespace string, podName string, source string) error {
var response error
if len(m.nextErrorsIfAny) > 0 {
response = m.nextErrorsIfAny[0]
m.nextErrorsIfAny = m.nextErrorsIfAny[1:]
}
return response
}
Loading