Skip to content

Commit

Permalink
Augment error behaviour (#920)
Browse files Browse the repository at this point in the history
* add Unwrap method to package errorfactory and a unit test for it

* temporary: add webhooks err check functions and unit tests

* rename removingStorageMsg var and add comment about the use of errorDuringValidationMsg

* sanitize error unit tests and reorder public error check functions

* refactor to use errors.As instead of errors.Cause which no longer fits

* replace pkg alias apiErrors with apierrors in pkg webhooks

* camelCase testName and testCases in errors_test.go from pkg webhooks

* add comment for Unwrap method and shorten public error comparison functions in pkg webhooks
  • Loading branch information
mihaialexandrescu authored and bartam1 committed Feb 17, 2023
1 parent 4b8b963 commit 4876e79
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 38 deletions.
8 changes: 4 additions & 4 deletions controllers/controller_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ func clusterLabelString(cluster *v1beta1.KafkaCluster) string {
// checkBrokerConnectionError is a convenience wrapper for returning from common
// broker connection errors
func checkBrokerConnectionError(logger logr.Logger, err error) (ctrl.Result, error) {
switch errors.Cause(err).(type) {
case errorfactory.BrokersUnreachable:
switch {
case errors.As(err, &errorfactory.BrokersUnreachable{}):
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.BrokersNotReady:
case errors.As(err, &errorfactory.BrokersNotReady{}):
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.ResourceNotReady:
case errors.As(err, &errorfactory.ResourceNotReady{}):
logger.Info("Needed resource for broker connection not found, may not be ready")
return ctrl.Result{
Requeue: true,
Expand Down
24 changes: 14 additions & 10 deletions controllers/kafkacluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,44 +124,48 @@ func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Req
for _, rec := range reconcilers {
err = rec.Reconcile(log)
if err != nil {
switch errors.Cause(err).(type) {
case errorfactory.BrokersUnreachable:
switch {
case errors.As(err, &errorfactory.BrokersUnreachable{}):
log.Info("Brokers unreachable, may still be starting up", "error", err.Error())
return ctrl.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.BrokersNotReady:
case errors.As(err, &errorfactory.BrokersNotReady{}):
log.Info("Brokers not ready, may still be starting up", "error", err.Error())
return ctrl.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.ResourceNotReady:
case errors.As(err, &errorfactory.ResourceNotReady{}):
log.Info("A new resource was not found or may not be ready", "error", err.Error())
return ctrl.Result{
RequeueAfter: time.Duration(7) * time.Second,
}, nil
case errorfactory.ReconcileRollingUpgrade:
case errors.As(err, &errorfactory.ReconcileRollingUpgrade{}):
log.Info("Rolling Upgrade in Progress")
return ctrl.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.CruiseControlNotReady:
case errors.As(err, &errorfactory.CruiseControlNotReady{}):
return ctrl.Result{
RequeueAfter: time.Duration(15) * time.Second,
}, nil
case errorfactory.CruiseControlTaskRunning:
case errors.As(err, &errorfactory.CruiseControlTaskRunning{}):
return ctrl.Result{
RequeueAfter: time.Duration(20) * time.Second,
}, nil
case errors.As(err, &errorfactory.CruiseControlTaskTimeout{}):
return ctrl.Result{
RequeueAfter: time.Duration(20) * time.Second,
}, nil
case errorfactory.CruiseControlTaskTimeout, errorfactory.CruiseControlTaskFailure:
case errors.As(err, &errorfactory.CruiseControlTaskFailure{}):
return ctrl.Result{
RequeueAfter: time.Duration(20) * time.Second,
}, nil
case errorfactory.PerBrokerConfigNotReady:
case errors.As(err, &errorfactory.PerBrokerConfigNotReady{}):
log.V(1).Info("dynamically updated broker configuration hasn't propagated through yet")
// for exponential backoff
return ctrl.Result{}, err
case errorfactory.LoadBalancerIPNotReady:
case errors.As(err, &errorfactory.LoadBalancerIPNotReady{}):
return ctrl.Result{
RequeueAfter: time.Duration(30) * time.Second,
}, nil
Expand Down
6 changes: 3 additions & 3 deletions controllers/kafkauser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ func (r *KafkaUserReconciler) Reconcile(ctx context.Context, request reconcile.R

user, err := pkiManager.ReconcileUserCertificate(ctx, instance, r.Scheme, cluster.Spec.GetKubernetesClusterDomain())
if err != nil {
switch errors.Cause(err).(type) {
case errorfactory.ResourceNotReady:
switch {
case errors.As(err, &errorfactory.ResourceNotReady{}):
reqLogger.Info("generated secret not found, may not be ready")
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Duration(5) * time.Second,
}, nil
case errorfactory.FatalReconcileError:
case errors.As(err, &errorfactory.FatalReconcileError{}):
// TODO: (tinyzimmer) - Sleep for longer for now to give user time to see the error
// But really we should catch these kinds of issues in a pre-admission hook in a future PR
// The user can fix while this is looping and it will pick it up next reconcile attempt
Expand Down
39 changes: 39 additions & 0 deletions pkg/errorfactory/errorfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,99 @@ import "emperror.dev/errors"
// ResourceNotReady states that resource is not ready
type ResourceNotReady struct{ error }

// Unwrap() allows compliance with Go1.13+ error chaining behavior
func (e ResourceNotReady) Unwrap() error { return e.error }

// APIFailure states that something went wrong with the api
type APIFailure struct{ error }

func (e APIFailure) Unwrap() error { return e.error }

// StatusUpdateError states that the operator failed to update the Status
type StatusUpdateError struct{ error }

func (e StatusUpdateError) Unwrap() error { return e.error }

// BrokersUnreachable states that the given broker is unreachable
type BrokersUnreachable struct{ error }

func (e BrokersUnreachable) Unwrap() error { return e.error }

// BrokersNotReady states that the broker is not ready
type BrokersNotReady struct{ error }

func (e BrokersNotReady) Unwrap() error { return e.error }

// BrokersRequestError states that the broker could not understand the request
type BrokersRequestError struct{ error }

func (e BrokersRequestError) Unwrap() error { return e.error }

// CreateTopicError states that the operator could not create the topic
type CreateTopicError struct{ error }

func (e CreateTopicError) Unwrap() error { return e.error }

// TopicNotFound states that the given topic is not found
type TopicNotFound struct{ error }

func (e TopicNotFound) Unwrap() error { return e.error }

// GracefulUpscaleFailed states that the operator failed to update the cluster gracefully
type GracefulUpscaleFailed struct{ error }

func (e GracefulUpscaleFailed) Unwrap() error { return e.error }

// TooManyResources states that too many resource found
type TooManyResources struct{ error }

func (e TooManyResources) Unwrap() error { return e.error }

// InternalError states that internal error happened
type InternalError struct{ error }

func (e InternalError) Unwrap() error { return e.error }

// FatalReconcileError states that a fatal error happened
type FatalReconcileError struct{ error }

func (e FatalReconcileError) Unwrap() error { return e.error }

// ReconcileRollingUpgrade states that rolling upgrade is reconciling
type ReconcileRollingUpgrade struct{ error }

func (e ReconcileRollingUpgrade) Unwrap() error { return e.error }

// CruiseControlNotReady states that CC is not ready to receive connection
type CruiseControlNotReady struct{ error }

func (e CruiseControlNotReady) Unwrap() error { return e.error }

// CruiseControlTaskRunning states that CC task is still running
type CruiseControlTaskRunning struct{ error }

func (e CruiseControlTaskRunning) Unwrap() error { return e.error }

// CruiseControlTaskRunning states that CC task timed out
type CruiseControlTaskTimeout struct{ error }

func (e CruiseControlTaskTimeout) Unwrap() error { return e.error }

// CruiseControlTaskFailure states that CC task was not found (CC restart?) or failed
type CruiseControlTaskFailure struct{ error }

func (e CruiseControlTaskFailure) Unwrap() error { return e.error }

// PerBrokerConfigNotReady states that per-broker configurations has been updated for a broker
type PerBrokerConfigNotReady struct{ error }

func (e PerBrokerConfigNotReady) Unwrap() error { return e.error }

// LoadBalancerIPNotReady states that the LoadBalancer IP is not yet created
type LoadBalancerIPNotReady struct{ error }

func (e LoadBalancerIPNotReady) Unwrap() error { return e.error }

// New creates a new error factory error
func New(t interface{}, err error, msg string, wrapArgs ...interface{}) error {
wrapped := errors.WrapIfWithDetails(err, msg, wrapArgs...)
Expand Down
26 changes: 26 additions & 0 deletions pkg/errorfactory/errorfactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,29 @@ func TestNew(t *testing.T) {
}
}
}

func TestUnwrapMethod(t *testing.T) {
// testError is a custom error type used to test the features of unwrapping errors using errors.Is() and errors.As()
type testError struct{ error }
var tstErr = testError{errors.New("inner-error")}

for _, errType := range errorTypes {
errType := errType
err := New(errType, tstErr, "test-message")

// This tests the use of errors.Is() using the Unwrap() method
if ok := errors.Is(err, tstErr); !ok {
t.Errorf("Type %T does not Unwrap() correctly using errors.Is(). Expected: %t ; Got: %t", errType, true, ok)
}

var c testError
// This tests the use of errors.As() using the Unwrap() method
if ok := errors.As(err, &c); !ok {
t.Errorf("Type %T does not Unwrap() correctly using errors.As(). Expected: %t ; Got: %t", errType, true, ok)
// This tests whether errors.As() succeeded in extracting the correct wrapped error into the given variable, not just the boolean return value
if c != tstErr {
t.Errorf("Type %T does not extract correctly the wrapped error using errors.As(). Expected: %v ; Got: %v", errType, tstErr, c)
}
}
}
}
36 changes: 26 additions & 10 deletions pkg/webhooks/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,36 @@ const (
invalidReplicationFactorErrMsg = "replication factor is larger than the number of nodes in the kafka cluster"
outOfRangeReplicationFactorErrMsg = "replication factor must be larger than 0 (or set it to be -1 to use the broker's default)"
outOfRangePartitionsErrMsg = "number of partitions must be larger than 0 (or set it to be -1 to use the broker's default)"
removingStorageMsg = "removing storage from a broker is not supported"
errorDuringValidationMsg = "error during validation"
unsupportedRemovingStorageMsg = "removing storage from a broker is not supported"

// errorDuringValidationMsg is added to infrastructure errors (e.g. failed to connect), but not to field validation errors
errorDuringValidationMsg = "error during validation"
)

func IsAdmissionCantConnect(err error) bool {
if apierrors.IsInternalError(err) && strings.Contains(err.Error(), cantConnectErrorMsg) {
return true
}
return false
return apierrors.IsInternalError(err) && strings.Contains(err.Error(), cantConnectErrorMsg)
}

func IsCantConnectAPIServer(err error) bool {
return apierrors.IsInternalError(err) && strings.Contains(err.Error(), cantConnectAPIServerMsg)
}

func IsInvalidReplicationFactor(err error) bool {
if apierrors.IsInvalid(err) && strings.Contains(err.Error(), invalidReplicationFactorErrMsg) {
return true
}
return false
return apierrors.IsInvalid(err) && strings.Contains(err.Error(), invalidReplicationFactorErrMsg)
}

func IsOutOfRangeReplicationFactor(err error) bool {
return apierrors.IsInvalid(err) && strings.Contains(err.Error(), outOfRangeReplicationFactorErrMsg)
}

func IsOutOfRangePartitions(err error) bool {
return apierrors.IsInvalid(err) && strings.Contains(err.Error(), outOfRangePartitionsErrMsg)
}

func IsInvalidRemovingStorage(err error) bool {
return apierrors.IsInvalid(err) && strings.Contains(err.Error(), unsupportedRemovingStorageMsg)
}

func IsErrorDuringValidation(err error) bool {
return apierrors.IsInternalError(err) && strings.Contains(err.Error(), errorDuringValidationMsg)
}
Loading

0 comments on commit 4876e79

Please sign in to comment.