From 079b9cdfe977ac41722dd1ae5591e1ebc3846165 Mon Sep 17 00:00:00 2001 From: Alena Varkockova Date: Fri, 19 May 2023 13:24:27 +0200 Subject: [PATCH] k8s: set Status.Version when all pods have that version --- .../redpanda/cluster_controller.go | 18 ++- .../redpanda/cluster_controller_test.go | 35 +++++- src/go/k8s/pkg/resources/statefulset.go | 66 +++++++--- src/go/k8s/pkg/resources/statefulset_test.go | 113 +++++++++++++++++- .../k8s/pkg/resources/statefulset_update.go | 16 ++- 5 files changed, 223 insertions(+), 25 deletions(-) diff --git a/src/go/k8s/controllers/redpanda/cluster_controller.go b/src/go/k8s/controllers/redpanda/cluster_controller.go index 22c6ab679bb2..495c67028eb0 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller.go @@ -655,7 +655,15 @@ func (r *ClusterReconciler) reportStatus( nodeList.Internal = observedNodesInternal nodeList.SchemaRegistry.Internal = fmt.Sprintf("%s:%d", clusterFQDN, schemaRegistryPort) - if statusShouldBeUpdated(&redpandaCluster.Status, nodeList, sts) { + //nolint:nestif // the code won't get clearer if it's splitted out in my opinion + version, versionErr := sts.CurrentVersion(ctx) + if versionErr != nil { + // this is non-fatal error, it will return error even if e.g. + // the rollout is not finished because then the currentversion + // of the cluster cannot be determined + r.Log.Info(fmt.Sprintf("cannot get CurrentVersion of statefulset, %s", err)) + } + if statusShouldBeUpdated(&redpandaCluster.Status, nodeList, sts, version, versionErr) { err := retry.RetryOnConflict(retry.DefaultRetry, func() error { var cluster redpandav1alpha1.Cluster err := r.Get(ctx, types.NamespacedName{ @@ -669,7 +677,9 @@ func (r *ClusterReconciler) reportStatus( cluster.Status.Nodes = *nodeList cluster.Status.ReadyReplicas = sts.LastObservedState.Status.ReadyReplicas cluster.Status.Replicas = sts.LastObservedState.Status.Replicas - cluster.Status.Version = sts.Version() + if versionErr == nil { + cluster.Status.Version = version + } err = r.Status().Update(ctx, &cluster) if err == nil { @@ -689,6 +699,8 @@ func statusShouldBeUpdated( status *redpandav1alpha1.ClusterStatus, nodeList *redpandav1alpha1.NodesList, sts *resources.StatefulSetResource, + newVersion string, + versionErr error, ) bool { return nodeList != nil && (!reflect.DeepEqual(nodeList.Internal, status.Nodes.Internal) || @@ -699,7 +711,7 @@ func statusShouldBeUpdated( !reflect.DeepEqual(nodeList.ExternalBootstrap, status.Nodes.ExternalBootstrap)) || status.Replicas != sts.LastObservedState.Status.Replicas || status.ReadyReplicas != sts.LastObservedState.Status.ReadyReplicas || - status.Version != sts.Version() + (versionErr == nil && status.Version != newVersion) } func (r *ClusterReconciler) podList(ctx context.Context, redpandaCluster *redpandav1alpha1.Cluster) (corev1.PodList, error) { diff --git a/src/go/k8s/controllers/redpanda/cluster_controller_test.go b/src/go/k8s/controllers/redpanda/cluster_controller_test.go index 71e24c8e384e..68719b1e55fe 100644 --- a/src/go/k8s/controllers/redpanda/cluster_controller_test.go +++ b/src/go/k8s/controllers/redpanda/cluster_controller_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" @@ -852,7 +853,12 @@ var _ = Describe("RedPandaCluster controller", func() { }) It("Should not throw error; redpanda version allowed", func() { key, redpandaCluster := getVersionedRedpanda("restricted-redpanda-positive", allowedVersion) - fc := fake.NewClientBuilder().WithObjects(redpandaCluster).Build() + pods := readyPodsForCluster(redpandaCluster) + objects := []client.Object{redpandaCluster} + for i := range pods { + objects = append(objects, pods[i]) + } + fc := fake.NewClientBuilder().WithObjects(objects...).Build() r := &redpanda.ClusterReconciler{ Client: fc, Log: ctrl.Log, @@ -898,6 +904,33 @@ var _ = Describe("RedPandaCluster controller", func() { Entry("Random image pull policy", "asdvasd", Not(Succeed()))) }) +func readyPodsForCluster(cluster *v1alpha1.Cluster) []*corev1.Pod { + var result []*corev1.Pod + for i := 0; i < int(*cluster.Spec.Replicas); i++ { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", i), + Namespace: cluster.Namespace, + Labels: map[string]string{ + "app.kubernetes.io/component": "redpanda", + "app.kubernetes.io/instance": cluster.Name, + "app.kubernetes.io/name": "redpanda", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + Containers: []corev1.Container{{ + Name: "redpanda", + Image: fmt.Sprintf("redpanda:%s", cluster.Spec.Version), + }}, + }, + Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}, + } + result = append(result, pod) + } + return result +} + func getVersionedRedpanda( name string, version string, ) (key types.NamespacedName, cluster *v1alpha1.Cluster) { diff --git a/src/go/k8s/pkg/resources/statefulset.go b/src/go/k8s/pkg/resources/statefulset.go index 54cd70c3cc8f..9a6ea09d4077 100644 --- a/src/go/k8s/pkg/resources/statefulset.go +++ b/src/go/k8s/pkg/resources/statefulset.go @@ -37,6 +37,7 @@ import ( "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates" resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils" ) var _ Resource = &StatefulSetResource{} @@ -864,26 +865,61 @@ func (r *StatefulSetResource) fullConfiguratorImage() string { return fmt.Sprintf("%s:%s", r.configuratorSettings.ConfiguratorBaseImage, r.configuratorSettings.ConfiguratorTag) } -// Version returns the cluster version specified in the image tag. +// Version returns the cluster version specified in the image tag of the +// statefulset spec. Depending on the rollout status it might not be the same as +// version of the pods. func (r *StatefulSetResource) Version() string { lastObservedSts := r.LastObservedState if lastObservedSts != nil { - cc := lastObservedSts.Spec.Template.Spec.Containers - for i := range cc { - c := cc[i] - if c.Name != redpandaContainerName { - continue - } - // Will always have tag even for latest because of pandaCluster.FullImageName(). - if s := strings.Split(c.Image, ":"); len(s) > 1 { - version := s[len(s)-1] - // Image uses registry with port and no tag (e.g. localhost:5000/redpanda) - if strings.Contains(version, "/") { - version = "" - } - return version + return redpandaContainerVersion(lastObservedSts.Spec.Template.Spec.Containers) + } + return "" +} + +func redpandaContainerVersion(containers []corev1.Container) string { + for i := range containers { + c := containers[i] + if c.Name != redpandaContainerName { + continue + } + // Will always have tag even for latest because of pandaCluster.FullImageName(). + if s := strings.Split(c.Image, ":"); len(s) > 1 { + version := s[len(s)-1] + // Image uses registry with port and no tag (e.g. localhost:5000/redpanda) + if strings.Contains(version, "/") { + version = "" } + return version } } return "" } + +// CurrentVersion is the version that's rolled out to all nodes (pods) of the cluster +func (r *StatefulSetResource) CurrentVersion(ctx context.Context) (string, error) { + stsVersion := r.Version() + if stsVersion == "" { + return "", nil + } + replicas := *r.LastObservedState.Spec.Replicas + pods, err := r.getPodList(ctx) + if err != nil { + return "", err + } + if int32(len(pods.Items)) != replicas { + //nolint:goerr113 // not going to use wrapped static error here this time + return stsVersion, fmt.Errorf("rollout incomplete: pods count %d does not match expected replicas %d", len(pods.Items), replicas) + } + for i := range pods.Items { + if !utils.IsPodReady(&pods.Items[i]) { + //nolint:goerr113 // no need for static error + return stsVersion, fmt.Errorf("rollout incomplete: at least one pod (%s) is not READY", pods.Items[i].Name) + } + podVersion := redpandaContainerVersion(pods.Items[i].Spec.Containers) + if podVersion != stsVersion { + //nolint:goerr113 // no need for static error + return stsVersion, fmt.Errorf("rollout incomplete: at least one pod has version %s not %s", podVersion, stsVersion) + } + } + return stsVersion, nil +} diff --git a/src/go/k8s/pkg/resources/statefulset_test.go b/src/go/k8s/pkg/resources/statefulset_test.go index 4d9e283b1c97..d760def49c8a 100644 --- a/src/go/k8s/pkg/resources/statefulset_test.go +++ b/src/go/k8s/pkg/resources/statefulset_test.go @@ -29,10 +29,15 @@ import ( redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1" adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin" + "github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels" res "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources" resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types" ) +const ( + redpandaContainerName = "redpanda" +) + //nolint:funlen // Test function can have more than 100 lines func TestEnsure(t *testing.T) { cluster := pandaCluster() @@ -313,8 +318,6 @@ func pandaCluster() *redpandav1alpha1.Cluster { } func TestVersion(t *testing.T) { - redpandaContainerName := "redpanda" - tests := []struct { Containers []corev1.Container ExpectedVersion string @@ -343,3 +346,109 @@ func TestVersion(t *testing.T) { assert.Equal(t, tt.ExpectedVersion, sts.Version()) } } + +//nolint:funlen // this is ok for a test +func TestCurrentVersion(t *testing.T) { + redpanda := pandaCluster() + tests := []struct { + name string + pods []corev1.Pod + expectedReplicas int32 + expectedVersion string + shouldError bool + }{ + {"one pod with matching version", []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: redpanda.Namespace, + }, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: redpandaContainerName, Image: "vectorized/redpanda:v21.11.11"}}}, + Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}, + }, + }, 1, "v21.11.11", false}, + {"one pod with matching version, not in ready state", []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: redpanda.Namespace, + }, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: redpandaContainerName, Image: "vectorized/redpanda:v21.11.11"}}}, + Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionFalse}}}, + }, + }, 1, "v21.11.11", true}, + {"one pod with matching version but expecting two replicas", []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: redpanda.Namespace, + }, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: redpandaContainerName, Image: "vectorized/redpanda:v21.11.11"}}}, + Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}, + }, + }, 2, "v21.11.11", true}, + {"one pod with matching and one pod with non-matching version", []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: redpanda.Namespace, + }, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: redpandaContainerName, Image: "vectorized/redpanda:v21.11.11"}}}, + Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test2", + Namespace: redpanda.Namespace, + }, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: redpandaContainerName, Image: "vectorized/redpanda:v22.2.2"}}}, + Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}}, + }, + }, 2, "v21.11.11", true}, + } + + for _, tt := range tests { + c := fake.NewClientBuilder().Build() + for i := range tt.pods { + pod := tt.pods[i] + pod.Labels = labels.ForCluster(redpanda) + assert.NoError(t, c.Create(context.TODO(), &pod)) + } + sts := res.NewStatefulSet(c, redpanda, scheme.Scheme, + "cluster.local", + "servicename", + types.NamespacedName{Name: "test", Namespace: "test"}, + TestStatefulsetTLSVolumeProvider{}, + TestAdminTLSConfigProvider{}, + "", + res.ConfiguratorSettings{ + ConfiguratorBaseImage: "vectorized/configurator", + ConfiguratorTag: "latest", + ImagePullPolicy: "Always", + }, + func(ctx context.Context) (string, error) { return hash, nil }, + func(ctx context.Context, k8sClient client.Reader, redpandaCluster *redpandav1alpha1.Cluster, fqdn string, adminTLSProvider resourcetypes.AdminTLSConfigProvider, ordinals ...int32) (adminutils.AdminAPIClient, error) { + return nil, nil + }, + time.Second, + ctrl.Log.WithName("test"), + 0) + sts.LastObservedState = &v1.StatefulSet{ + Spec: v1.StatefulSetSpec{ + Replicas: &tt.expectedReplicas, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: redpandaContainerName, Image: "vectorized/redpanda:v21.11.11"}}, + }, + }, + }, + } + v, err := sts.CurrentVersion(context.TODO()) + assert.Equal(t, tt.expectedVersion, v) + if tt.shouldError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + } +} diff --git a/src/go/k8s/pkg/resources/statefulset_update.go b/src/go/k8s/pkg/resources/statefulset_update.go index d99f4de272a0..0ea4e9db8c3f 100644 --- a/src/go/k8s/pkg/resources/statefulset_update.go +++ b/src/go/k8s/pkg/resources/statefulset_update.go @@ -136,21 +136,29 @@ func (r *StatefulSetResource) isClusterHealthy(ctx context.Context) error { return nil } -func (r *StatefulSetResource) rollingUpdate( - ctx context.Context, template *corev1.PodTemplateSpec, -) error { +func (r *StatefulSetResource) getPodList(ctx context.Context) (*corev1.PodList, error) { var podList corev1.PodList err := r.List(ctx, &podList, &k8sclient.ListOptions{ Namespace: r.pandaCluster.Namespace, LabelSelector: labels.ForCluster(r.pandaCluster).AsClientSelector(), }) if err != nil { - return fmt.Errorf("unable to list panda pods: %w", err) + return nil, fmt.Errorf("unable to list panda pods: %w", err) } sort.Slice(podList.Items, func(i, j int) bool { return podList.Items[i].Name < podList.Items[j].Name }) + return &podList, nil +} + +func (r *StatefulSetResource) rollingUpdate( + ctx context.Context, template *corev1.PodTemplateSpec, +) error { + podList, err := r.getPodList(ctx) + if err != nil { + return fmt.Errorf("error getting pods %w", err) + } var artificialPod corev1.Pod artificialPod.Annotations = template.Annotations