Skip to content

Commit

Permalink
fix: If resource synchronization retry occurs, other events of the sa…
Browse files Browse the repository at this point in the history
…me resource will be blocked (#760)
  • Loading branch information
gxthrj committed Nov 26, 2021
1 parent b127ff4 commit 62d7897
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 30 deletions.
6 changes: 3 additions & 3 deletions pkg/ingress/apisix_cluster_config.go
Expand Up @@ -210,7 +210,7 @@ func (c *apisixClusterConfigController) onAdd(obj interface{}) {
zap.Any("object", obj),
)

c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: key,
})
Expand All @@ -234,7 +234,7 @@ func (c *apisixClusterConfigController) onUpdate(oldObj, newObj interface{}) {
zap.Any("old object", prev),
)

c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
Object: key,
})
Expand All @@ -260,7 +260,7 @@ func (c *apisixClusterConfigController) onDelete(obj interface{}) {
log.Debugw("ApisixClusterConfig delete event arrived",
zap.Any("final state", acc),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: key,
Tombstone: acc,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/apisix_consumer.go
Expand Up @@ -165,7 +165,7 @@ func (c *apisixConsumerController) onAdd(obj interface{}) {
zap.Any("object", obj),
)

c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: key,
})
Expand All @@ -192,7 +192,7 @@ func (c *apisixConsumerController) onUpdate(oldObj, newObj interface{}) {
zap.Any("old object", prev),
)

c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
Object: key,
})
Expand Down Expand Up @@ -221,7 +221,7 @@ func (c *apisixConsumerController) onDelete(obj interface{}) {
log.Debugw("ApisixConsumer delete event arrived",
zap.Any("final state", ac),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: key,
Tombstone: ac,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/apisix_route.go
Expand Up @@ -333,7 +333,7 @@ func (c *apisixRouteController) onAdd(obj interface{}) {
zap.Any("object", obj))

ar := kube.MustNewApisixRoute(obj)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: kube.ApisixRouteEvent{
Key: key,
Expand Down Expand Up @@ -362,7 +362,7 @@ func (c *apisixRouteController) onUpdate(oldObj, newObj interface{}) {
zap.Any("new object", curr),
zap.Any("old object", prev),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
Object: kube.ApisixRouteEvent{
Key: key,
Expand Down Expand Up @@ -394,7 +394,7 @@ func (c *apisixRouteController) onDelete(obj interface{}) {
log.Debugw("ApisixRoute delete event arrived",
zap.Any("final state", ar),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: kube.ApisixRouteEvent{
Key: key,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/apisix_tls.go
Expand Up @@ -195,7 +195,7 @@ func (c *apisixTlsController) onAdd(obj interface{}) {
log.Debugw("ApisixTls add event arrived",
zap.Any("object", obj),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: key,
})
Expand All @@ -221,7 +221,7 @@ func (c *apisixTlsController) onUpdate(prev, curr interface{}) {
zap.Any("new object", curr),
zap.Any("old object", prev),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
Object: key,
})
Expand Down Expand Up @@ -252,7 +252,7 @@ func (c *apisixTlsController) onDelete(obj interface{}) {
log.Debugw("ApisixTls delete event arrived",
zap.Any("final state", obj),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: key,
Tombstone: tls,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/apisix_upstream.go
Expand Up @@ -227,7 +227,7 @@ func (c *apisixUpstreamController) onAdd(obj interface{}) {
log.Debugw("ApisixUpstream add event arrived",
zap.Any("object", obj))

c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: key,
})
Expand All @@ -254,7 +254,7 @@ func (c *apisixUpstreamController) onUpdate(oldObj, newObj interface{}) {
zap.Any("old object", prev),
)

c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
Object: key,
})
Expand Down Expand Up @@ -283,7 +283,7 @@ func (c *apisixUpstreamController) onDelete(obj interface{}) {
log.Debugw("ApisixUpstream delete event arrived",
zap.Any("final state", au),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: key,
Tombstone: au,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/endpoint.go
Expand Up @@ -112,7 +112,7 @@ func (c *endpointsController) onAdd(obj interface{}) {
log.Debugw("endpoints add event arrived",
zap.String("object-key", key))

c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
// TODO pass key.
Object: kube.NewEndpoint(obj.(*corev1.Endpoints)),
Expand Down Expand Up @@ -140,7 +140,7 @@ func (c *endpointsController) onUpdate(prev, curr interface{}) {
zap.Any("new object", currEp),
zap.Any("old object", prevEp),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
// TODO pass key.
Object: kube.NewEndpoint(currEp),
Expand Down Expand Up @@ -169,7 +169,7 @@ func (c *endpointsController) onDelete(obj interface{}) {
log.Debugw("endpoints delete event arrived",
zap.Any("final state", ep),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: kube.NewEndpoint(ep),
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/endpointslice.go
Expand Up @@ -142,7 +142,7 @@ func (c *endpointSliceController) onAdd(obj interface{}) {
zap.String("object-key", key),
)

c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: endpointSliceEvent{
Key: key,
Expand Down Expand Up @@ -182,7 +182,7 @@ func (c *endpointSliceController) onUpdate(prev, curr interface{}) {
zap.Any("new object", currEp),
zap.Any("old object", prevEp),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
// TODO pass key.
Object: endpointSliceEvent{
Expand Down Expand Up @@ -221,7 +221,7 @@ func (c *endpointSliceController) onDelete(obj interface{}) {
log.Debugw("endpoints delete event arrived",
zap.Any("object-key", key),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: endpointSliceEvent{
Key: key,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/ingress.go
Expand Up @@ -220,7 +220,7 @@ func (c *ingressController) onAdd(obj interface{}) {
return
}

c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: kube.IngressEvent{
Key: key,
Expand Down Expand Up @@ -257,7 +257,7 @@ func (c *ingressController) onUpdate(oldObj, newObj interface{}) {
return
}

c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
Object: kube.IngressEvent{
Key: key,
Expand Down Expand Up @@ -298,7 +298,7 @@ func (c *ingressController) OnDelete(obj interface{}) {
)
return
}
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: kube.IngressEvent{
Key: key,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/namespace.go
Expand Up @@ -133,7 +133,7 @@ func (c *namespaceController) onAdd(obj interface{}) {
if err == nil {
log.Debugw(key)
}
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: key,
})
Expand All @@ -150,15 +150,15 @@ func (c *namespaceController) onUpdate(pre, cur interface{}) {
log.Errorf("found Namespace resource with error: %s", err)
return
}
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
Object: key,
})
}

func (c *namespaceController) onDelete(obj interface{}) {
namespace := obj.(*corev1.Namespace)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: namespace.Name,
Tombstone: namespace,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingress/secret.go
Expand Up @@ -242,7 +242,7 @@ func (c *secretController) onAdd(obj interface{}) {
log.Debugw("secret add event arrived",
zap.String("object-key", key),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: key,
})
Expand All @@ -269,7 +269,7 @@ func (c *secretController) onUpdate(prev, curr interface{}) {
zap.Any("new object", curr),
zap.Any("old object", prev),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
Object: key,
})
Expand Down Expand Up @@ -302,7 +302,7 @@ func (c *secretController) onDelete(obj interface{}) {
log.Debugw("secret delete event arrived",
zap.Any("final state", sec),
)
c.workqueue.AddRateLimited(&types.Event{
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: key,
Tombstone: sec,
Expand Down

0 comments on commit 62d7897

Please sign in to comment.