Skip to content

Commit

Permalink
fix: verify generation in record status (#706)
Browse files Browse the repository at this point in the history
  • Loading branch information
gxthrj committed Oct 26, 2021
1 parent 97fdc90 commit 2a73216
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 77 deletions.
8 changes: 4 additions & 4 deletions pkg/ingress/apisix_cluster_config.go
Expand Up @@ -143,7 +143,7 @@ func (c *apisixClusterConfigController) sync(ctx context.Context, ev *types.Even
zap.Any("opts", clusterOpts),
)
c.controller.recorderEvent(acc, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(acc, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(acc, _resourceSyncAborted, err, metav1.ConditionFalse, acc.GetGeneration())
return err
}
}
Expand All @@ -157,7 +157,7 @@ func (c *apisixClusterConfigController) sync(ctx context.Context, ev *types.Even
zap.Any("object", acc),
)
c.controller.recorderEvent(acc, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(acc, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(acc, _resourceSyncAborted, err, metav1.ConditionFalse, acc.GetGeneration())
return err
}
log.Debugw("translated global_rule",
Expand All @@ -176,11 +176,11 @@ func (c *apisixClusterConfigController) sync(ctx context.Context, ev *types.Even
zap.Any("cluster", acc.Name),
)
c.controller.recorderEvent(acc, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(acc, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(acc, _resourceSyncAborted, err, metav1.ConditionFalse, acc.GetGeneration())
return err
}
c.controller.recorderEvent(acc, corev1.EventTypeNormal, _resourceSynced, nil)
c.controller.recordStatus(acc, _resourceSynced, nil, metav1.ConditionTrue)
c.controller.recordStatus(acc, _resourceSynced, nil, metav1.ConditionTrue, acc.GetGeneration())
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ingress/apisix_consumer.go
Expand Up @@ -116,7 +116,7 @@ func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) er
zap.Any("ApisixConsumer", ac),
)
c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse, ac.GetGeneration())
return err
}
log.Debug("got consumer object from ApisixConsumer",
Expand All @@ -130,7 +130,7 @@ func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) er
zap.Any("consumer", consumer),
)
c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse, ac.GetGeneration())
c.controller.metricsCollector.IncrSyncOperation("consumer", "failure")
return err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/ingress/apisix_route.go
Expand Up @@ -270,13 +270,13 @@ func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error)
c.controller.recorderEvent(ar.V1(), v1.EventTypeNormal, _resourceSynced, nil)
case kube.ApisixRouteV2alpha1:
c.controller.recorderEvent(ar.V2alpha1(), v1.EventTypeNormal, _resourceSynced, nil)
c.controller.recordStatus(ar.V2alpha1(), _resourceSynced, nil, metav1.ConditionTrue)
c.controller.recordStatus(ar.V2alpha1(), _resourceSynced, nil, metav1.ConditionTrue, ar.V2alpha1().GetGeneration())
case kube.ApisixRouteV2beta1:
c.controller.recorderEvent(ar.V2beta1(), v1.EventTypeNormal, _resourceSynced, nil)
c.controller.recordStatus(ar.V2beta1(), _resourceSynced, nil, metav1.ConditionTrue)
c.controller.recordStatus(ar.V2beta1(), _resourceSynced, nil, metav1.ConditionTrue, ar.V2beta1().GetGeneration())
case kube.ApisixRouteV2beta2:
c.controller.recorderEvent(ar.V2beta2(), v1.EventTypeNormal, _resourceSynced, nil)
c.controller.recordStatus(ar.V2beta2(), _resourceSynced, nil, metav1.ConditionTrue)
c.controller.recordStatus(ar.V2beta2(), _resourceSynced, nil, metav1.ConditionTrue, ar.V2beta2().GetGeneration())
}
} else {
log.Errorw("failed list ApisixRoute",
Expand All @@ -299,13 +299,13 @@ func (c *apisixRouteController) handleSyncErr(obj interface{}, errOrigin error)
c.controller.recorderEvent(ar.V1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin)
case kube.ApisixRouteV2alpha1:
c.controller.recorderEvent(ar.V2alpha1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin)
c.controller.recordStatus(ar.V2alpha1(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse)
c.controller.recordStatus(ar.V2alpha1(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse, ar.V2alpha1().GetGeneration())
case kube.ApisixRouteV2beta1:
c.controller.recorderEvent(ar.V2beta1(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin)
c.controller.recordStatus(ar.V2beta1(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse)
c.controller.recordStatus(ar.V2beta1(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse, ar.V2beta1().GetGeneration())
case kube.ApisixRouteV2beta2:
c.controller.recorderEvent(ar.V2beta2(), v1.EventTypeWarning, _resourceSyncAborted, errOrigin)
c.controller.recordStatus(ar.V2beta2(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse)
c.controller.recordStatus(ar.V2beta2(), _resourceSyncAborted, errOrigin, metav1.ConditionFalse, ar.V2beta2().GetGeneration())
}
} else {
log.Errorw("failed list ApisixRoute",
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/apisix_tls.go
Expand Up @@ -120,7 +120,7 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
zap.Any("ApisixTls", tls),
)
c.controller.recorderEvent(tls, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
return err
}
log.Debugw("got SSL object from ApisixTls",
Expand All @@ -143,11 +143,11 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
zap.Any("ssl", ssl),
)
c.controller.recorderEvent(tls, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
return err
}
c.controller.recorderEvent(tls, corev1.EventTypeNormal, _resourceSynced, nil)
c.controller.recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue)
c.controller.recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue, tls.GetGeneration())
return err
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/ingress/apisix_upstream.go
Expand Up @@ -129,7 +129,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
if err != nil {
log.Errorf("failed to get service %s: %s", key, err)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
return err
}

Expand All @@ -150,7 +150,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
}
log.Errorf("failed to get upstream %s: %s", upsName, err)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
return err
}
var newUps *apisixv1.Upstream
Expand All @@ -167,7 +167,7 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
zap.Error(err),
)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
return err
}
} else {
Expand All @@ -189,14 +189,14 @@ func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) er
zap.String("cluster", clusterName),
)
c.controller.recorderEvent(au, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(au, _resourceSyncAborted, err, metav1.ConditionFalse, au.GetGeneration())
return err
}
}
}
if ev.Type != types.EventDelete {
c.controller.recorderEvent(au, corev1.EventTypeNormal, _resourceSynced, nil)
c.controller.recordStatus(au, _resourceSynced, nil, metav1.ConditionTrue)
c.controller.recordStatus(au, _resourceSynced, nil, metav1.ConditionTrue, au.GetGeneration())
}
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingress/secret.go
Expand Up @@ -159,7 +159,7 @@ func (c *secretController) sync(ctx context.Context, ev *types.Event) error {
go func(tls *configv1.ApisixTls) {
c.controller.recorderEventS(tls, corev1.EventTypeWarning, _resourceSyncAborted,
fmt.Sprintf("sync from secret %s changes failed, error: %s", key, err.Error()))
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
}(tls)
return true
}
Expand All @@ -177,7 +177,7 @@ func (c *secretController) sync(ctx context.Context, ev *types.Event) error {
go func(tls *configv1.ApisixTls) {
c.controller.recorderEventS(tls, corev1.EventTypeWarning, _resourceSyncAborted,
fmt.Sprintf("sync from ca secret %s changes failed, error: %s", key, err.Error()))
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
}(tls)
return true
}
Expand All @@ -203,11 +203,11 @@ func (c *secretController) sync(ctx context.Context, ev *types.Event) error {
)
c.controller.recorderEventS(tls, corev1.EventTypeWarning, _resourceSyncAborted,
fmt.Sprintf("sync from secret %s changes failed, error: %s", key, err.Error()))
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse)
c.controller.recordStatus(tls, _resourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
} else {
c.controller.recorderEventS(tls, corev1.EventTypeNormal, _resourceSynced,
fmt.Sprintf("sync from secret %s changes", key))
c.controller.recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue)
c.controller.recordStatus(tls, _resourceSynced, nil, metav1.ConditionTrue, tls.GetGeneration())
}
}(ssl, tls)
return true
Expand Down
128 changes: 75 additions & 53 deletions pkg/ingress/status.go
Expand Up @@ -35,18 +35,28 @@ const (
_commonSuccessMessage = "Sync Successfully"
)

// verifyGeneration verify generation to decide whether to update status
func (c *Controller) verifyGeneration(conditions *[]metav1.Condition, newCondition metav1.Condition) bool {
existingCondition := meta.FindStatusCondition(*conditions, newCondition.Type)
if existingCondition != nil && existingCondition.ObservedGeneration >= newCondition.ObservedGeneration {
return false
}
return true
}

// recordStatus record resources status
func (c *Controller) recordStatus(at interface{}, reason string, err error, status v1.ConditionStatus) {
func (c *Controller) recordStatus(at interface{}, reason string, err error, status v1.ConditionStatus, generation int64) {
// build condition
message := _commonSuccessMessage
if err != nil {
message = err.Error()
}
condition := metav1.Condition{
Type: _conditionType,
Reason: reason,
Status: status,
Message: message,
Type: _conditionType,
Reason: reason,
Status: status,
Message: message,
ObservedGeneration: generation,
}
client := c.kubeClient.APISIXClient

Expand All @@ -57,89 +67,101 @@ func (c *Controller) recordStatus(at interface{}, reason string, err error, stat
conditions := make([]metav1.Condition, 0)
v.Status.Conditions = &conditions
}
meta.SetStatusCondition(v.Status.Conditions, condition)
if _, errRecord := client.ApisixV1().ApisixTlses(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixTls",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
if c.verifyGeneration(v.Status.Conditions, condition) {
meta.SetStatusCondition(v.Status.Conditions, condition)
if _, errRecord := client.ApisixV1().ApisixTlses(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixTls",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
}
}
case *configv1.ApisixUpstream:
// set to status
if v.Status.Conditions == nil {
conditions := make([]metav1.Condition, 0)
v.Status.Conditions = &conditions
}
meta.SetStatusCondition(v.Status.Conditions, condition)
if _, errRecord := client.ApisixV1().ApisixUpstreams(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixUpstream",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
if c.verifyGeneration(v.Status.Conditions, condition) {
meta.SetStatusCondition(v.Status.Conditions, condition)
if _, errRecord := client.ApisixV1().ApisixUpstreams(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixUpstream",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
}
}
case *configv2alpha1.ApisixRoute:
// set to status
if v.Status.Conditions == nil {
conditions := make([]metav1.Condition, 0)
v.Status.Conditions = &conditions
}
meta.SetStatusCondition(v.Status.Conditions, condition)
if _, errRecord := client.ApisixV2alpha1().ApisixRoutes(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixRoute",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
if c.verifyGeneration(v.Status.Conditions, condition) {
meta.SetStatusCondition(v.Status.Conditions, condition)
if _, errRecord := client.ApisixV2alpha1().ApisixRoutes(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixRoute",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
}
}
case *configv2beta1.ApisixRoute:
// set to status
if v.Status.Conditions == nil {
conditions := make([]metav1.Condition, 0)
v.Status.Conditions = conditions
}
meta.SetStatusCondition(&v.Status.Conditions, condition)
if _, errRecord := client.ApisixV2beta1().ApisixRoutes(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixRoute",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
if c.verifyGeneration(&v.Status.Conditions, condition) {
meta.SetStatusCondition(&v.Status.Conditions, condition)
if _, errRecord := client.ApisixV2beta1().ApisixRoutes(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixRoute",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
}
}
case *configv2beta2.ApisixRoute:
// set to status
if v.Status.Conditions == nil {
conditions := make([]metav1.Condition, 0)
v.Status.Conditions = conditions
}
meta.SetStatusCondition(&v.Status.Conditions, condition)
if _, errRecord := client.ApisixV2beta2().ApisixRoutes(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixRoute",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
if c.verifyGeneration(&v.Status.Conditions, condition) {
meta.SetStatusCondition(&v.Status.Conditions, condition)
if _, errRecord := client.ApisixV2beta2().ApisixRoutes(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixRoute",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
}
}
case *configv2alpha1.ApisixConsumer:
// set to status
if v.Status.Conditions == nil {
conditions := make([]metav1.Condition, 0)
v.Status.Conditions = &conditions
}
meta.SetStatusCondition(v.Status.Conditions, condition)
if _, errRecord := client.ApisixV2alpha1().ApisixConsumers(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixConsumer",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
if c.verifyGeneration(v.Status.Conditions, condition) {
meta.SetStatusCondition(v.Status.Conditions, condition)
if _, errRecord := client.ApisixV2alpha1().ApisixConsumers(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixConsumer",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
}
}
default:
// This should not be executed
Expand Down

0 comments on commit 2a73216

Please sign in to comment.