Skip to content

Commit

Permalink
Merge pull request redpanda-data#10793 from joejulian/reconciler_loop…
Browse files Browse the repository at this point in the history
…_log_improvements

operator: make controller logs traceable
  • Loading branch information
RafalKorepta committed May 24, 2023
2 parents eecc874 + d62e27c commit 57e2d9e
Show file tree
Hide file tree
Showing 17 changed files with 127 additions and 115 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
- name: Run gofumpt
run: |
find src/go -type f -name '*.go' | xargs -n1 gofumpt -w -lang=1.20
find src/go -type f -not -name 'zz*' -name '*.go' | xargs -n1 "$HOME/.local/bin/gofumpt" -w -lang=1.20
git diff --exit-code
py:
Expand Down
31 changes: 17 additions & 14 deletions src/go/k8s/apis/redpanda/v1alpha1/cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"

cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -29,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
Expand Down Expand Up @@ -83,9 +83,6 @@ type redpandaResourceField struct {
path *field.Path
}

// log is for logging in this package.
var log = logf.Log.WithName("cluster-resource")

// kclient is controller-runtime client.
var kclient client.Client

Expand Down Expand Up @@ -125,7 +122,8 @@ func sidecarResourceFields(c *Cluster) []resourceField {
// Default implements defaulting webhook logic - all defaults that should be
// applied to cluster CRD after user submits it should be put in here
func (r *Cluster) Default() {
log.Info("default", "name", r.Name)
log := ctrl.Log.WithName("Cluster.Default").WithValues("namespace", r.Namespace, "name", r.Name)
log.Info("defaulting")
if r.Spec.Configuration.SchemaRegistry != nil && r.Spec.Configuration.SchemaRegistry.Port == 0 {
r.Spec.Configuration.SchemaRegistry.Port = defaultSchemaRegistryPort
}
Expand Down Expand Up @@ -200,9 +198,10 @@ var _ webhook.Validator = &Cluster{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *Cluster) ValidateCreate() error {
log.Info("validate create", "name", r.Name)
log := ctrl.Log.WithName("Cluster.ValidateCreate").WithValues("namespace", r.Namespace, "name", r.Name)
log.Info("validating create")

allErrs := r.validateCommon()
allErrs := r.validateCommon(log)

if len(allErrs) == 0 {
return nil
Expand All @@ -215,7 +214,8 @@ func (r *Cluster) ValidateCreate() error {

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *Cluster) ValidateUpdate(old runtime.Object) error {
log.Info("validate update", "name", r.Name)
log := ctrl.Log.WithName("Cluster.ValidateUpdate").WithValues("namespace", r.Namespace, "name", r.Name)
log.Info("validating update")

// Don't validate if the cluster is being deleted.
// After receiving delete, the controller will update the cluster CR to remove the finalzer.
Expand All @@ -227,7 +227,7 @@ func (r *Cluster) ValidateUpdate(old runtime.Object) error {
}

oldCluster := old.(*Cluster)
allErrs := r.validateCommon()
allErrs := r.validateCommon(log)

allErrs = append(allErrs, r.validateDownscaling(oldCluster)...)

Expand All @@ -244,12 +244,13 @@ func (r *Cluster) ValidateUpdate(old runtime.Object) error {
r.Name, allErrs)
}

func (r *Cluster) validateCommon() field.ErrorList {
func (r *Cluster) validateCommon(log logr.Logger) field.ErrorList {
vcLog := log.WithName("validateCommon")
var allErrs field.ErrorList
allErrs = append(allErrs, r.validateScaling()...)
allErrs = append(allErrs, r.validateKafkaListeners()...)
allErrs = append(allErrs, r.validateKafkaListeners(vcLog)...)
allErrs = append(allErrs, r.validateAdminListeners()...)
allErrs = append(allErrs, r.validatePandaproxyListeners()...)
allErrs = append(allErrs, r.validatePandaproxyListeners(vcLog)...)
allErrs = append(allErrs, r.validateSchemaRegistryListener()...)
allErrs = append(allErrs, r.checkCollidingPorts()...)
allErrs = append(allErrs, r.validateRedpandaMemory()...)
Expand Down Expand Up @@ -350,7 +351,8 @@ func (r *Cluster) validateAdminListeners() field.ErrorList {
return allErrs
}

func (r *Cluster) validateKafkaListeners() field.ErrorList {
func (r *Cluster) validateKafkaListeners(l logr.Logger) field.ErrorList {
log := l.WithName("validateKafkaListeners")
var allErrs field.ErrorList
if len(r.Spec.Configuration.KafkaAPI) == 0 {
allErrs = append(allErrs,
Expand Down Expand Up @@ -456,9 +458,10 @@ func checkValidEndpointTemplate(tmpl string) error {
}

//nolint:funlen,gocyclo // it's a sequence of checks
func (r *Cluster) validatePandaproxyListeners() field.ErrorList {
func (r *Cluster) validatePandaproxyListeners(l logr.Logger) field.ErrorList {
var allErrs field.ErrorList
var proxyExternal *PandaproxyAPI
log := l.WithName("validatePandaproxyListeners")
kafkaExternal := r.ExternalListener()
p := r.Spec.Configuration.PandaproxyAPI
for i := range r.Spec.Configuration.PandaproxyAPI {
Expand Down
3 changes: 1 addition & 2 deletions src/go/k8s/apis/redpanda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/go/k8s/config/samples/one_node_cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: Cluster
metadata:
name: one-node-cluster
spec:
image: "vectorized/redpanda"
image: "redpandadata/redpanda"
version: "latest"
replicas: 1
resources:
Expand Down
58 changes: 35 additions & 23 deletions src/go/k8s/controllers/redpanda/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"
"time"

"github.com/fluxcd/pkg/runtime/logger"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -103,11 +104,13 @@ type ClusterReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.7.0/pkg/reconcile
//
//nolint:funlen,gocyclo // todo break down
func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("redpandacluster", req.NamespacedName)
func (r *ClusterReconciler) Reconcile(
ctx context.Context, req ctrl.Request,
) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx).WithName("ClusterReconciler.Reconcile")

log.Info(fmt.Sprintf("Starting reconcile loop for %v", req.NamespacedName))
defer log.Info(fmt.Sprintf("Finished reconcile loop for %v", req.NamespacedName))
log.Info("Starting reconcile loop")
defer log.Info("Finished reconcile loop")

var redpandaCluster redpandav1alpha1.Cluster
crb := resources.NewClusterRoleBinding(r.Client, &redpandaCluster, r.Scheme, log)
Expand All @@ -130,7 +133,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

// if the cluster isn't being deleted, add a finalizer
if !controllerutil.ContainsFinalizer(&redpandaCluster, FinalizerKey) {
log.V(7).Info("adding finalizer")
log.V(logger.DebugLevel).Info("adding finalizer")
controllerutil.AddFinalizer(&redpandaCluster, FinalizerKey)
if err := r.Update(ctx, &redpandaCluster); err != nil {
return ctrl.Result{}, fmt.Errorf("unable to set Cluster finalizer: %w", err)
Expand Down Expand Up @@ -363,8 +366,9 @@ func validateImagePullPolicy(imagePullPolicy corev1.PullPolicy) error {

//nolint:funlen,gocyclo // refactor in the next iteration
func (r *ClusterReconciler) handlePodFinalizer(
ctx context.Context, rp *redpandav1alpha1.Cluster, log logr.Logger,
ctx context.Context, rp *redpandav1alpha1.Cluster, l logr.Logger,
) error {
log := l.WithName("handlePodFinalizer")
pods, err := r.podList(ctx, rp)
if err != nil {
return fmt.Errorf("unable to fetch PodList: %w", err)
Expand Down Expand Up @@ -488,10 +492,11 @@ func (r *ClusterReconciler) handlePodFinalizer(
}

func (r *ClusterReconciler) removePodFinalizer(
ctx context.Context, pod *corev1.Pod, log logr.Logger,
ctx context.Context, pod *corev1.Pod, l logr.Logger,
) error {
log := l.WithName("removePodFinalizer")
if controllerutil.ContainsFinalizer(pod, FinalizerKey) {
log.V(7).WithValues("namespace", pod.Namespace, "name", pod.Name).Info("removing finalizer")
log.V(logger.DebugLevel).WithValues("namespace", pod.Namespace, "name", pod.Name).Info("removing finalizer")
controllerutil.RemoveFinalizer(pod, FinalizerKey)
if err := r.Update(ctx, pod); err != nil {
return err
Expand All @@ -501,10 +506,11 @@ func (r *ClusterReconciler) removePodFinalizer(
}

func (r *ClusterReconciler) setPodFinalizer(
ctx context.Context, pod *corev1.Pod, log logr.Logger,
ctx context.Context, pod *corev1.Pod, l logr.Logger,
) error {
log := l.WithName("setPodFinalizer")
if !controllerutil.ContainsFinalizer(pod, FinalizerKey) {
log.V(7).WithValues("namespace", pod.Namespace, "name", pod.Name).Info("adding finalizer")
log.V(logger.DebugLevel).WithValues("namespace", pod.Namespace, "name", pod.Name).Info("adding finalizer")
controllerutil.AddFinalizer(pod, FinalizerKey)
if err := r.Update(ctx, pod); err != nil {
return err
Expand All @@ -514,9 +520,10 @@ func (r *ClusterReconciler) setPodFinalizer(
}

func (r *ClusterReconciler) setPodNodeIDAnnotation(
ctx context.Context, rp *redpandav1alpha1.Cluster, log logr.Logger,
ctx context.Context, rp *redpandav1alpha1.Cluster, l logr.Logger,
) error {
log.V(6).Info("setting pod node-id annotation")
log := l.WithName("setPodNodeIDAnnotation")
log.V(logger.DebugLevel).Info("setting pod node-id annotation")
pods, err := r.podList(ctx, rp)
if err != nil {
return fmt.Errorf("unable to fetch PodList: %w", err)
Expand Down Expand Up @@ -562,9 +569,10 @@ func (r *ClusterReconciler) setPodNodeIDAnnotation(
}

func (r *ClusterReconciler) decommissionBroker(
ctx context.Context, rp *redpandav1alpha1.Cluster, nodeID int, log logr.Logger,
ctx context.Context, rp *redpandav1alpha1.Cluster, nodeID int, l logr.Logger,
) error {
log.V(6).WithValues("node-id", nodeID).Info("decommission broker")
log := l.WithName("decommissionBroker").WithValues("node-id", nodeID)
log.V(logger.DebugLevel).Info("decommission broker")

redpandaPorts := networking.NewRedpandaPorts(rp)
headlessPorts := collectHeadlessPorts(redpandaPorts)
Expand All @@ -589,7 +597,8 @@ func (r *ClusterReconciler) decommissionBroker(
return nil
}

func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1alpha1.Cluster, pod *corev1.Pod, log logr.Logger) (int32, error) {
func (r *ClusterReconciler) fetchAdminNodeID(ctx context.Context, rp *redpandav1alpha1.Cluster, pod *corev1.Pod, l logr.Logger) (int32, error) {
log := l.WithName("fetchAdminNodeID")
redpandaPorts := networking.NewRedpandaPorts(rp)
headlessPorts := collectHeadlessPorts(redpandaPorts)
clusterPorts := collectClusterPorts(redpandaPorts, rp)
Expand Down Expand Up @@ -913,10 +922,12 @@ func (r *ClusterReconciler) createExternalNodesList(
}

func (r *ClusterReconciler) handleClusterDeletion(
ctx context.Context, redpandaCluster *redpandav1alpha1.Cluster, log logr.Logger,
ctx context.Context, redpandaCluster *redpandav1alpha1.Cluster, l logr.Logger,
) (reconcile.Result, error) {
log := l.WithName("handleClusterDeletion")
log.V(logger.DebugLevel).Info("handling cluster deletion")
if controllerutil.ContainsFinalizer(redpandaCluster, FinalizerKey) {
log.V(7).Info("removing finalizers")
log.V(logger.DebugLevel).Info("removing finalizers")
pods, err := r.podList(ctx, redpandaCluster)
if err != nil {
return ctrl.Result{}, fmt.Errorf("unable to list Pods: %w", err)
Expand Down Expand Up @@ -1135,25 +1146,26 @@ func collectClusterPorts(
}

func isRedpandaClusterManaged(
log logr.Logger, redpandaCluster *redpandav1alpha1.Cluster,
l logr.Logger, redpandaCluster *redpandav1alpha1.Cluster,
) bool {
log := l.WithName("isRedpandaClusterManaged")
managedAnnotationKey := redpandav1alpha1.GroupVersion.Group + "/managed"
if managed, exists := redpandaCluster.Annotations[managedAnnotationKey]; exists && managed == "false" {
log.Info(fmt.Sprintf("management of %s is disabled; to enable it, change the '%s' annotation to true or remove it",
redpandaCluster.Name, managedAnnotationKey))
log.Info(fmt.Sprintf("management is disabled; to enable it, change the '%s' annotation to true or remove it",
managedAnnotationKey))
return false
}
return true
}

func isRedpandaClusterVersionManaged(
log logr.Logger,
l logr.Logger,
redpandaCluster *redpandav1alpha1.Cluster,
restrictToRedpandaVersion string,
) bool {
log := l.WithName("isRedpandaClusterVersionManaged").WithValues("restrictToRedpandaVersion", restrictToRedpandaVersion, "cluster spec.version", redpandaCluster.Status.Version)
if restrictToRedpandaVersion != "" && restrictToRedpandaVersion != redpandaCluster.Spec.Version {
log.Info(fmt.Sprintf("management of %s is restricted to cluster (spec) version %s; cluster has spec version %s and status version %s",
redpandaCluster.Name, restrictToRedpandaVersion, redpandaCluster.Spec.Version, redpandaCluster.Status.Version))
log.Info("not managed due to version management restriction")
return false
}
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func (r *ClusterReconciler) reconcileConfiguration(
statefulSetResource *resources.StatefulSetResource,
pki *certmanager.PkiReconciler,
fqdn string,
log logr.Logger,
l logr.Logger,
) error {
log := l.WithName("reconcileConfiguration")
errorWithContext := newErrorWithContext(redpandaCluster.Namespace, redpandaCluster.Name)
if !featuregates.CentralizedConfiguration(redpandaCluster.Spec.Version) {
log.Info("Cluster is not using centralized configuration, skipping...")
Expand Down Expand Up @@ -180,8 +181,9 @@ func (r *ClusterReconciler) applyPatchIfNeeded(
clusterConfig admin.Config,
status admin.ConfigStatusResponse,
lastAppliedConfiguration map[string]interface{},
log logr.Logger,
l logr.Logger,
) (success bool, err error) {
log := l.WithName("applyPatchIfNeeded")
errorWithContext := newErrorWithContext(redpandaCluster.Namespace, redpandaCluster.Name)

var invalidProperties []string
Expand Down Expand Up @@ -255,8 +257,9 @@ func (r *ClusterReconciler) retrieveClusterState(
func (r *ClusterReconciler) ensureConditionPresent(
ctx context.Context,
redpandaCluster *redpandav1alpha1.Cluster,
log logr.Logger,
l logr.Logger,
) (bool, error) {
log := l.WithName("ensureConditionPresent")
if condition := redpandaCluster.Status.GetCondition(redpandav1alpha1.ClusterConfiguredConditionType); condition == nil {
// nil condition means that no change has been detected earlier, but we can't assume that configuration is in sync
// because of multiple reasons, for example:
Expand Down Expand Up @@ -313,8 +316,9 @@ func (r *ClusterReconciler) synchronizeStatusWithCluster(
ctx context.Context,
redpandaCluster *redpandav1alpha1.Cluster,
adminAPI adminutils.AdminAPIClient,
log logr.Logger,
l logr.Logger,
) (*redpandav1alpha1.ClusterCondition, error) {
log := l.WithName("synchronizeStatusWithCluster")
errorWithContext := newErrorWithContext(redpandaCluster.Namespace, redpandaCluster.Name)
// Check status again on the leader using admin API
status, err := adminAPI.ClusterConfigStatus(ctx, true)
Expand Down Expand Up @@ -399,11 +403,12 @@ func mapStatusToCondition(
}

func needsRestart(
clusterStatus admin.ConfigStatusResponse, log logr.Logger,
clusterStatus admin.ConfigStatusResponse, l logr.Logger,
) bool {
log := l.WithName("needsRestart")
nodeNeedsRestart := false
for i := range clusterStatus {
log.Info(fmt.Sprintf("Node %d restart status is %v", clusterStatus[i].NodeID, clusterStatus[i].Restart))
log.WithValues("broker id", clusterStatus[i].NodeID, "restart status", clusterStatus[i].Restart).Info("broker restart status")
if clusterStatus[i].Restart {
nodeNeedsRestart = true
}
Expand All @@ -412,8 +417,9 @@ func needsRestart(
}

func isSafeToRestart(
clusterStatus admin.ConfigStatusResponse, log logr.Logger,
clusterStatus admin.ConfigStatusResponse, l logr.Logger,
) bool {
log := l.WithName("isSafeToRestart")
configVersions := make(map[int64]bool)
for i := range clusterStatus {
log.Info(fmt.Sprintf("Node %d is using config version %d", clusterStatus[i].NodeID, clusterStatus[i].ConfigVersion))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"time"

"github.com/fluxcd/pkg/runtime/logger"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -34,8 +35,6 @@ import (

const (
defaultDriftCheckPeriod = 1 * time.Minute

debugLogLevel = 4
)

// ClusterConfigurationDriftReconciler detects drifts in the cluster configuration and triggers a reconciliation.
Expand All @@ -55,10 +54,10 @@ type ClusterConfigurationDriftReconciler struct {
func (r *ClusterConfigurationDriftReconciler) Reconcile(
ctx context.Context, req ctrl.Request,
) (ctrl.Result, error) {
log := r.Log.WithValues("redpandacluster", req.NamespacedName)
log := ctrl.LoggerFrom(ctx).WithName("ClusterConfigurationDriftReconciler.Reconcile")

log.V(debugLogLevel).Info(fmt.Sprintf("Starting configuration drift reconcile loop for %v", req.NamespacedName))
defer log.V(debugLogLevel).Info(fmt.Sprintf("Finished configuration drift reconcile loop for %v", req.NamespacedName))
log.V(logger.DebugLevel).Info("Starting configuration drift reconcile loop")
defer log.V(logger.DebugLevel).Info("Finished configuration drift reconcile loop")

var redpandaCluster redpandav1alpha1.Cluster
if err := r.Get(ctx, req.NamespacedName, &redpandaCluster); err != nil {
Expand Down
Loading

0 comments on commit 57e2d9e

Please sign in to comment.