Skip to content

Commit

Permalink
feat: support ApisixConsumer v2 (#989)
Browse files Browse the repository at this point in the history
  • Loading branch information
lingsamuel authored May 23, 2022
1 parent bef2010 commit f6f0a3b
Show file tree
Hide file tree
Showing 13 changed files with 630 additions and 86 deletions.
1 change: 1 addition & 0 deletions cmd/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ For example, no available LB exists in the bare metal environment.`)
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixRouteVersion, "apisix-route-version", config.ApisixRouteV2beta3, "the supported apisixroute api group version, can be \"apisix.apache.org/v2beta2\" or \"apisix.apache.org/v2beta3\"")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixTlsVersion, "apisix-tls-version", config.ApisixV2beta3, "the supported apisixtls api group version, can be \"apisix.apache.org/v2beta3\" or \"apisix.apache.org/v2\"")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixClusterConfigVersion, "apisix-cluster-config-version", config.ApisixV2beta3, "the supported ApisixClusterConfig api group version, can be \"apisix.apache.org/v2beta3\" or \"apisix.apache.org/v2\"")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixConsumerVersion, "apisix-consumer-version", config.ApisixV2beta3, "the supported ApisixConsumer api group version, can be \"apisix.apache.org/v2beta3\" or \"apisix.apache.org/v2\"")
cmd.PersistentFlags().BoolVar(&cfg.Kubernetes.WatchEndpointSlices, "watch-endpointslices", false, "whether to watch endpointslices rather than endpoints")
cmd.PersistentFlags().BoolVar(&cfg.Kubernetes.EnableGatewayAPI, "enable-gateway-api", false, "whether to enable support for Gateway API")
cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, "default-apisix-cluster-base-url", "", "the base URL of admin api / manager api for the default APISIX cluster")
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type KubernetesConfig struct {
IngressVersion string `json:"ingress_version" yaml:"ingress_version"`
WatchEndpointSlices bool `json:"watch_endpoint_slices" yaml:"watch_endpoint_slices"`
ApisixRouteVersion string `json:"apisix_route_version" yaml:"apisix_route_version"`
ApisixConsumerVersion string `json:"apisix_consumer_version" yaml:"apisix_consumer_version"`
ApisixTlsVersion string `json:"apisix_tls_version" yaml:"apisix_tls_version"`
ApisixClusterConfigVersion string `json:"apisix_cluster_config_version" yaml:"apisix_cluster_config_version"`
EnableGatewayAPI bool `json:"enable_gateway_api" yaml:"enable_gateway_api"`
Expand Down Expand Up @@ -133,6 +134,7 @@ func NewDefaultConfig() *Config {
IngressClass: IngressClass,
IngressVersion: IngressNetworkingV1,
ApisixRouteVersion: ApisixRouteV2beta3,
ApisixConsumerVersion: ApisixV2beta3,
ApisixTlsVersion: ApisixV2beta3,
ApisixClusterConfigVersion: ApisixV2beta3,
WatchEndpointSlices: false,
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestNewConfigFromFile(t *testing.T) {
IngressClass: IngressClass,
IngressVersion: IngressNetworkingV1,
ApisixRouteVersion: ApisixRouteV2beta3,
ApisixConsumerVersion: ApisixV2beta3,
ApisixTlsVersion: ApisixV2beta3,
ApisixClusterConfigVersion: ApisixV2beta3,
},
Expand Down Expand Up @@ -128,6 +129,7 @@ func TestConfigWithEnvVar(t *testing.T) {
IngressClass: IngressClass,
IngressVersion: IngressNetworkingV1,
ApisixRouteVersion: ApisixRouteV2beta3,
ApisixConsumerVersion: ApisixV2beta3,
ApisixTlsVersion: ApisixV2beta3,
ApisixClusterConfigVersion: ApisixV2beta3,
},
Expand Down
158 changes: 119 additions & 39 deletions pkg/ingress/apisix_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ingress

import (
"context"
"fmt"
"time"

"go.uber.org/zap"
Expand All @@ -25,7 +26,8 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/kube"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
)
Expand Down Expand Up @@ -79,62 +81,113 @@ func (c *apisixConsumerController) runWorker(ctx context.Context) {
}

func (c *apisixConsumerController) sync(ctx context.Context, ev *types.Event) error {
key := ev.Object.(string)
event := ev.Object.(kube.ApisixConsumerEvent)
key := event.Key
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
log.Errorf("found ApisixConsumer resource with invalid meta namespace key %s: %s", key, err)
return err
}

ac, err := c.controller.apisixConsumerLister.ApisixConsumers(namespace).Get(name)
var multiVersioned kube.ApisixConsumer
switch event.GroupVersion {
case config.ApisixV2beta3:
multiVersioned, err = c.controller.apisixConsumerLister.V2beta3(namespace, name)
case config.ApisixV2:
multiVersioned, err = c.controller.apisixConsumerLister.V2(namespace, name)
default:
return fmt.Errorf("unsupported ApisixConsumer group version %s", event.GroupVersion)
}

if err != nil {
if !k8serrors.IsNotFound(err) {
log.Errorf("failed to get ApisixConsumer %s: %s", key, err)
log.Errorw("failed to get ApisixConsumer",
zap.Error(err),
zap.String("key", key),
zap.String("version", event.GroupVersion),
)
return err
}
if ev.Type != types.EventDelete {
log.Warnf("ApisixConsumer %s was deleted before it can be delivered", key)
log.Warnw("ApisixConsumer was deleted before it can be delivered",
zap.String("key", key),
zap.String("version", event.GroupVersion),
)
// Don't need to retry.
return nil
}
}
if ev.Type == types.EventDelete {
if ac != nil {
if multiVersioned != nil {
// We still find the resource while we are processing the DELETE event,
// that means object with same namespace and name was created, discarding
// this stale DELETE event.
log.Warnf("discard the stale ApisixConsumer delete event since the %s exists", key)
return nil
}
ac = ev.Tombstone.(*configv2beta3.ApisixConsumer)
multiVersioned = ev.Tombstone.(kube.ApisixConsumer)
}

consumer, err := c.controller.translator.TranslateApisixConsumer(ac)
if err != nil {
log.Errorw("failed to translate ApisixConsumer",
zap.Error(err),
switch event.GroupVersion {
case config.ApisixV2beta3:
ac := multiVersioned.V2beta3()

consumer, err := c.controller.translator.TranslateApisixConsumerV2beta3(ac)
if err != nil {
log.Errorw("failed to translate ApisixConsumer",
zap.Error(err),
zap.Any("ApisixConsumer", ac),
)
c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse, ac.GetGeneration())
return err
}
log.Debugw("got consumer object from ApisixConsumer",
zap.Any("consumer", consumer),
zap.Any("ApisixConsumer", ac),
)
c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse, ac.GetGeneration())
return err
}
log.Debug("got consumer object from ApisixConsumer",
zap.Any("consumer", consumer),
zap.Any("ApisixConsumer", ac),
)

if err := c.controller.syncConsumer(ctx, consumer, ev.Type); err != nil {
log.Errorw("failed to sync Consumer to APISIX",
zap.Error(err),
if err := c.controller.syncConsumer(ctx, consumer, ev.Type); err != nil {
log.Errorw("failed to sync Consumer to APISIX",
zap.Error(err),
zap.Any("consumer", consumer),
)
c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse, ac.GetGeneration())
return err
}

c.controller.recorderEvent(ac, corev1.EventTypeNormal, _resourceSynced, nil)
case config.ApisixV2:
ac := multiVersioned.V2()

consumer, err := c.controller.translator.TranslateApisixConsumerV2(ac)
if err != nil {
log.Errorw("failed to translate ApisixConsumer",
zap.Error(err),
zap.Any("ApisixConsumer", ac),
)
c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse, ac.GetGeneration())
return err
}
log.Debugw("got consumer object from ApisixConsumer",
zap.Any("consumer", consumer),
zap.Any("ApisixConsumer", ac),
)
c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse, ac.GetGeneration())
return err
}

c.controller.recorderEvent(ac, corev1.EventTypeNormal, _resourceSynced, nil)
if err := c.controller.syncConsumer(ctx, consumer, ev.Type); err != nil {
log.Errorw("failed to sync Consumer to APISIX",
zap.Error(err),
zap.Any("consumer", consumer),
)
c.controller.recorderEvent(ac, corev1.EventTypeWarning, _resourceSyncAborted, err)
c.controller.recordStatus(ac, _resourceSyncAborted, err, metav1.ConditionFalse, ac.GetGeneration())
return err
}

c.controller.recorderEvent(ac, corev1.EventTypeNormal, _resourceSynced, nil)
}
return nil
}

Expand Down Expand Up @@ -162,6 +215,11 @@ func (c *apisixConsumerController) handleSyncErr(obj interface{}, err error) {
}

func (c *apisixConsumerController) onAdd(obj interface{}) {
ac, err := kube.NewApisixConsumer(obj)
if err != nil {
log.Errorw("found ApisixConsumer resource with bad type", zap.Error(err))
return
}
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorf("found ApisixConsumer resource with bad meta namespace key: %s", err)
Expand All @@ -175,17 +233,28 @@ func (c *apisixConsumerController) onAdd(obj interface{}) {
)

c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Object: key,
Type: types.EventAdd,
Object: kube.ApisixConsumerEvent{
Key: key,
GroupVersion: ac.GroupVersion(),
},
})

c.controller.MetricsCollector.IncrEvents("consumer", "add")
}

func (c *apisixConsumerController) onUpdate(oldObj, newObj interface{}) {
prev := oldObj.(*configv2beta3.ApisixConsumer)
curr := newObj.(*configv2beta3.ApisixConsumer)
if prev.ResourceVersion >= curr.ResourceVersion {
prev, err := kube.NewApisixConsumer(oldObj)
if err != nil {
log.Errorw("found ApisixConsumer resource with bad type", zap.Error(err))
return
}
curr, err := kube.NewApisixConsumer(newObj)
if err != nil {
log.Errorw("found ApisixConsumer resource with bad type", zap.Error(err))
return
}
if prev.ResourceVersion() >= curr.ResourceVersion() {
return
}
key, err := cache.MetaNamespaceKeyFunc(newObj)
Expand All @@ -202,21 +271,29 @@ func (c *apisixConsumerController) onUpdate(oldObj, newObj interface{}) {
)

c.workqueue.Add(&types.Event{
Type: types.EventUpdate,
Object: key,
Type: types.EventUpdate,
Object: kube.ApisixConsumerEvent{
Key: key,
OldObject: prev,
GroupVersion: curr.GroupVersion(),
},
})

c.controller.MetricsCollector.IncrEvents("consumer", "update")
}

func (c *apisixConsumerController) onDelete(obj interface{}) {
ac, ok := obj.(*configv2beta3.ApisixConsumer)
if !ok {
ac, err := kube.NewApisixConsumer(obj)
if err != nil {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
ac = tombstone.Obj.(*configv2beta3.ApisixConsumer)
ac, err = kube.NewApisixConsumer(tombstone.Obj)
if err != nil {
log.Errorw("found ApisixConsumer resource with bad type", zap.Error(err))
return
}
}

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
Expand All @@ -231,8 +308,11 @@ func (c *apisixConsumerController) onDelete(obj interface{}) {
zap.Any("final state", ac),
)
c.workqueue.Add(&types.Event{
Type: types.EventDelete,
Object: key,
Type: types.EventDelete,
Object: kube.ApisixConsumerEvent{
Key: key,
GroupVersion: ac.GroupVersion(),
},
Tombstone: ac,
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingress/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (c *Controller) CompareResources(ctx context.Context) error {
ctx.Done()
} else {
for _, con := range retConsumer.Items {
consumer, err := c.translator.TranslateApisixConsumer(&con)
consumer, err := c.translator.TranslateApisixConsumerV2beta3(&con)
if err != nil {
log.Error(err.Error())
ctx.Done()
Expand Down
17 changes: 14 additions & 3 deletions pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type Controller struct {
apisixClusterConfigLister kube.ApisixClusterConfigLister
apisixClusterConfigInformer cache.SharedIndexInformer
apisixConsumerInformer cache.SharedIndexInformer
apisixConsumerLister listersv2beta3.ApisixConsumerLister
apisixConsumerLister kube.ApisixConsumerLister
apisixPluginConfigInformer cache.SharedIndexInformer
apisixPluginConfigLister kube.ApisixPluginConfigLister
gatewayInformer cache.SharedIndexInformer
Expand Down Expand Up @@ -204,6 +204,7 @@ func (c *Controller) initWhenStartLeading() {
apisixRouteInformer cache.SharedIndexInformer
apisixTlsInformer cache.SharedIndexInformer
apisixClusterConfigInformer cache.SharedIndexInformer
apisixConsumerInformer cache.SharedIndexInformer
)

kubeFactory := c.kubeClient.NewSharedIndexInformerFactory()
Expand Down Expand Up @@ -234,7 +235,10 @@ func (c *Controller) initWhenStartLeading() {
apisixFactory.Apisix().V2beta3().ApisixClusterConfigs().Lister(),
apisixFactory.Apisix().V2().ApisixClusterConfigs().Lister(),
)
c.apisixConsumerLister = apisixFactory.Apisix().V2beta3().ApisixConsumers().Lister()
c.apisixConsumerLister = kube.NewApisixConsumerLister(
apisixFactory.Apisix().V2beta3().ApisixConsumers().Lister(),
apisixFactory.Apisix().V2().ApisixConsumers().Lister(),
)
c.apisixPluginConfigLister = kube.NewApisixPluginConfigLister(
apisixFactory.Apisix().V2beta3().ApisixPluginConfigs().Lister(),
)
Expand Down Expand Up @@ -289,6 +293,13 @@ func (c *Controller) initWhenStartLeading() {
panic(fmt.Errorf("unsupported ApisixClusterConfig version %v", c.cfg.Kubernetes.ApisixClusterConfigVersion))
}

switch c.cfg.Kubernetes.ApisixConsumerVersion {
case config.ApisixRouteV2beta3:
apisixConsumerInformer = apisixFactory.Apisix().V2beta3().ApisixConsumers().Informer()
case config.ApisixRouteV2:
apisixConsumerInformer = apisixFactory.Apisix().V2().ApisixConsumers().Informer()
}

c.namespaceInformer = kubeFactory.Core().V1().Namespaces().Informer()
c.podInformer = kubeFactory.Core().V1().Pods().Informer()
c.svcInformer = kubeFactory.Core().V1().Services().Informer()
Expand All @@ -298,7 +309,7 @@ func (c *Controller) initWhenStartLeading() {
c.apisixClusterConfigInformer = apisixClusterConfigInformer
c.secretInformer = kubeFactory.Core().V1().Secrets().Informer()
c.apisixTlsInformer = apisixTlsInformer
c.apisixConsumerInformer = apisixFactory.Apisix().V2beta3().ApisixConsumers().Informer()
c.apisixConsumerInformer = apisixConsumerInformer
c.apisixPluginConfigInformer = apisixFactory.Apisix().V2beta3().ApisixPluginConfigs().Informer()

if c.cfg.Kubernetes.WatchEndpointSlices {
Expand Down
17 changes: 17 additions & 0 deletions pkg/ingress/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,23 @@ func (c *Controller) recordStatus(at interface{}, reason string, err error, stat
)
}
}
case *configv2.ApisixConsumer:
// set to status
if v.Status.Conditions == nil {
conditions := make([]metav1.Condition, 0)
v.Status.Conditions = conditions
}
if c.verifyGeneration(&v.Status.Conditions, condition) {
meta.SetStatusCondition(&v.Status.Conditions, condition)
if _, errRecord := client.ApisixV2().ApisixConsumers(v.Namespace).
UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for ApisixConsumer",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
}
}
case *configv2beta3.ApisixPluginConfig:
// set to status
if v.Status.Conditions == nil {
Expand Down
Loading

0 comments on commit f6f0a3b

Please sign in to comment.