Skip to content

Commit

Permalink
k8s: set Status.Version when all pods have that version
Browse files Browse the repository at this point in the history
  • Loading branch information
alenkacz committed May 22, 2023
1 parent bae6479 commit 8d7cc2c
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 23 deletions.
11 changes: 10 additions & 1 deletion src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ func (r *ClusterReconciler) reportStatus(
nodeList.Internal = observedNodesInternal
nodeList.SchemaRegistry.Internal = fmt.Sprintf("%s:%d", clusterFQDN, schemaRegistryPort)

//nolint:nestif // the code won't get clearer if it's splitted out in my opinion
if statusShouldBeUpdated(&redpandaCluster.Status, nodeList, sts) {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
var cluster redpandav1alpha1.Cluster
Expand All @@ -669,7 +670,15 @@ func (r *ClusterReconciler) reportStatus(
cluster.Status.Nodes = *nodeList
cluster.Status.ReadyReplicas = sts.LastObservedState.Status.ReadyReplicas
cluster.Status.Replicas = sts.LastObservedState.Status.Replicas
cluster.Status.Version = sts.Version()
version, err := sts.CurrentVersion(ctx)
if err != nil {
// this is non-fatal error, it will return error even if e.g.
// the rollout is not finished because then the currentversion
// of the cluster cannot be determined
r.Log.Info(fmt.Sprintf("cannot set Status.Version, %s", err))
} else {
cluster.Status.Version = version
}

err = r.Status().Update(ctx, &cluster)
if err == nil {
Expand Down
35 changes: 34 additions & 1 deletion src/go/k8s/controllers/redpanda/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
Expand Down Expand Up @@ -852,7 +853,12 @@ var _ = Describe("RedPandaCluster controller", func() {
})
It("Should not throw error; redpanda version allowed", func() {
key, redpandaCluster := getVersionedRedpanda("restricted-redpanda-positive", allowedVersion)
fc := fake.NewClientBuilder().WithObjects(redpandaCluster).Build()
pods := readyPodsForCluster(redpandaCluster)
objects := []client.Object{redpandaCluster}
for i := range pods {
objects = append(objects, pods[i])
}
fc := fake.NewClientBuilder().WithObjects(objects...).Build()
r := &redpanda.ClusterReconciler{
Client: fc,
Log: ctrl.Log,
Expand Down Expand Up @@ -898,6 +904,33 @@ var _ = Describe("RedPandaCluster controller", func() {
Entry("Random image pull policy", "asdvasd", Not(Succeed())))
})

func readyPodsForCluster(cluster *v1alpha1.Cluster) []*corev1.Pod {
var result []*corev1.Pod
for i := 0; i < int(*cluster.Spec.Replicas); i++ {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", i),
Namespace: cluster.Namespace,
Labels: map[string]string{
"app.kubernetes.io/component": "redpanda",
"app.kubernetes.io/instance": cluster.Name,
"app.kubernetes.io/name": "redpanda",
},
},
Spec: corev1.PodSpec{
NodeName: "test-node",
Containers: []corev1.Container{{
Name: "redpanda",
Image: fmt.Sprintf("redpanda:%s", cluster.Spec.Version),
}},
},
Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}},
}
result = append(result, pod)
}
return result
}

func getVersionedRedpanda(
name string, version string,
) (key types.NamespacedName, cluster *v1alpha1.Cluster) {
Expand Down
66 changes: 51 additions & 15 deletions src/go/k8s/pkg/resources/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
)

var _ Resource = &StatefulSetResource{}
Expand Down Expand Up @@ -864,26 +865,61 @@ func (r *StatefulSetResource) fullConfiguratorImage() string {
return fmt.Sprintf("%s:%s", r.configuratorSettings.ConfiguratorBaseImage, r.configuratorSettings.ConfiguratorTag)
}

// Version returns the cluster version specified in the image tag.
// Version returns the cluster version specified in the image tag of the
// statefulset spec. Depending on the rollout status it might not be the same as
// version of the pods.
func (r *StatefulSetResource) Version() string {
lastObservedSts := r.LastObservedState
if lastObservedSts != nil {
cc := lastObservedSts.Spec.Template.Spec.Containers
for i := range cc {
c := cc[i]
if c.Name != redpandaContainerName {
continue
}
// Will always have tag even for latest because of pandaCluster.FullImageName().
if s := strings.Split(c.Image, ":"); len(s) > 1 {
version := s[len(s)-1]
// Image uses registry with port and no tag (e.g. localhost:5000/redpanda)
if strings.Contains(version, "/") {
version = ""
}
return version
return redpandaContainerVersion(lastObservedSts.Spec.Template.Spec.Containers)
}
return ""
}

func redpandaContainerVersion(containers []corev1.Container) string {
for i := range containers {
c := containers[i]
if c.Name != redpandaContainerName {
continue
}
// Will always have tag even for latest because of pandaCluster.FullImageName().
if s := strings.Split(c.Image, ":"); len(s) > 1 {
version := s[len(s)-1]
// Image uses registry with port and no tag (e.g. localhost:5000/redpanda)
if strings.Contains(version, "/") {
version = ""
}
return version
}
}
return ""
}

// CurrentVersion is the version that's rolled out to all nodes (pods) of the cluster
func (r *StatefulSetResource) CurrentVersion(ctx context.Context) (string, error) {
stsVersion := r.Version()
if stsVersion == "" {
return "", nil
}
replicas := *r.LastObservedState.Spec.Replicas
pods, err := r.getPodList(ctx)
if err != nil {
return "", err
}
if int32(len(pods.Items)) != replicas {
//nolint:goerr113 // not going to use wrapped static error here this time
return stsVersion, fmt.Errorf("rollout incomplete: pods count %d does not match expected replicas %d", len(pods.Items), replicas)
}
for i := range pods.Items {
if !utils.IsPodReady(&pods.Items[i]) {
//nolint:goerr113 // no need for static error
return stsVersion, fmt.Errorf("rollout incomplete: at least one pod (%s) is not READY", pods.Items[i].Name)
}
podVersion := redpandaContainerVersion(pods.Items[i].Spec.Containers)
if podVersion != stsVersion {
//nolint:goerr113 // no need for static error
return stsVersion, fmt.Errorf("rollout incomplete: at least one pod has version %s not %s", podVersion, stsVersion)
}
}
return stsVersion, nil
}
112 changes: 110 additions & 2 deletions src/go/k8s/pkg/resources/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ import (

redpandav1alpha1 "github.com/redpanda-data/redpanda/src/go/k8s/apis/redpanda/v1alpha1"
adminutils "github.com/redpanda-data/redpanda/src/go/k8s/pkg/admin"
"github.com/redpanda-data/redpanda/src/go/k8s/pkg/labels"
res "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources"
resourcetypes "github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/types"
)

const (
redpandaContainerName = "redpanda"
)

//nolint:funlen // Test function can have more than 100 lines
func TestEnsure(t *testing.T) {
cluster := pandaCluster()
Expand Down Expand Up @@ -313,8 +318,6 @@ func pandaCluster() *redpandav1alpha1.Cluster {
}

func TestVersion(t *testing.T) {
redpandaContainerName := "redpanda"

tests := []struct {
Containers []corev1.Container
ExpectedVersion string
Expand Down Expand Up @@ -343,3 +346,108 @@ func TestVersion(t *testing.T) {
assert.Equal(t, tt.ExpectedVersion, sts.Version())
}
}

func TestCurrentVersion(t *testing.T) {
redpanda := pandaCluster()
tests := []struct {
name string
pods []corev1.Pod
expectedReplicas int32
expectedVersion string
shouldError bool
}{
{"one pod with matching version", []corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: redpanda.Namespace,
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: redpandaContainerName, Image: "vectorized/redpanda:v21.11.11"}}},
Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}},
},
}, 1, "v21.11.11", false},
{"one pod with matching version, not in ready state", []corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: redpanda.Namespace,
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: redpandaContainerName, Image: "vectorized/redpanda:v21.11.11"}}},
Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionFalse}}},
},
}, 1, "v21.11.11", true},
{"one pod with matching version but expecting two replicas", []corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: redpanda.Namespace,
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: redpandaContainerName, Image: "vectorized/redpanda:v21.11.11"}}},
Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}},
},
}, 2, "v21.11.11", true},
{"one pod with matching and one pod with non-matching version", []corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: redpanda.Namespace,
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: redpandaContainerName, Image: "vectorized/redpanda:v21.11.11"}}},
Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "test2",
Namespace: redpanda.Namespace,
},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: redpandaContainerName, Image: "vectorized/redpanda:v22.2.2"}}},
Status: corev1.PodStatus{Conditions: []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue}}},
},
}, 2, "v21.11.11", true},
}

for _, tt := range tests {
c := fake.NewClientBuilder().Build()
for i := range tt.pods {
pod := tt.pods[i]
pod.Labels = labels.ForCluster(redpanda)
assert.NoError(t, c.Create(context.TODO(), &pod))
}
sts := res.NewStatefulSet(c, redpanda, scheme.Scheme,
"cluster.local",
"servicename",
types.NamespacedName{Name: "test", Namespace: "test"},
TestStatefulsetTLSVolumeProvider{},
TestAdminTLSConfigProvider{},
"",
res.ConfiguratorSettings{
ConfiguratorBaseImage: "vectorized/configurator",
ConfiguratorTag: "latest",
ImagePullPolicy: "Always",
},
func(ctx context.Context) (string, error) { return hash, nil },
func(ctx context.Context, k8sClient client.Reader, redpandaCluster *redpandav1alpha1.Cluster, fqdn string, adminTLSProvider resourcetypes.AdminTLSConfigProvider, ordinals ...int32) (adminutils.AdminAPIClient, error) {
return nil, nil
},
time.Second,
ctrl.Log.WithName("test"),
0)
sts.LastObservedState = &v1.StatefulSet{
Spec: v1.StatefulSetSpec{
Replicas: &tt.expectedReplicas,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: redpandaContainerName, Image: "vectorized/redpanda:v21.11.11"}},
},
},
},
}
v, err := sts.CurrentVersion(context.TODO())
assert.Equal(t, tt.expectedVersion, v)
if tt.shouldError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
}
}
16 changes: 12 additions & 4 deletions src/go/k8s/pkg/resources/statefulset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,29 @@ func (r *StatefulSetResource) isClusterHealthy(ctx context.Context) error {
return nil
}

func (r *StatefulSetResource) rollingUpdate(
ctx context.Context, template *corev1.PodTemplateSpec,
) error {
func (r *StatefulSetResource) getPodList(ctx context.Context) (*corev1.PodList, error) {
var podList corev1.PodList
err := r.List(ctx, &podList, &k8sclient.ListOptions{
Namespace: r.pandaCluster.Namespace,
LabelSelector: labels.ForCluster(r.pandaCluster).AsClientSelector(),
})
if err != nil {
return fmt.Errorf("unable to list panda pods: %w", err)
return nil, fmt.Errorf("unable to list panda pods: %w", err)
}

sort.Slice(podList.Items, func(i, j int) bool {
return podList.Items[i].Name < podList.Items[j].Name
})
return &podList, nil
}

func (r *StatefulSetResource) rollingUpdate(
ctx context.Context, template *corev1.PodTemplateSpec,
) error {
podList, err := r.getPodList(ctx)
if err != nil {
return fmt.Errorf("error getting pods %w", err)
}

var artificialPod corev1.Pod
artificialPod.Annotations = template.Annotations
Expand Down

0 comments on commit 8d7cc2c

Please sign in to comment.