diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go index 324ff7b125..33005e3266 100644 --- a/cmd/ingress/ingress.go +++ b/cmd/ingress/ingress.go @@ -171,6 +171,7 @@ For example, no available LB exists in the bare metal environment.`) cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, "default-apisix-cluster-base-url", "", "the base URL of admin api / manager api for the default APISIX cluster") cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterAdminKey, "default-apisix-cluster-admin-key", "", "admin key used for the authorization of admin api / manager api for the default APISIX cluster") cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterName, "default-apisix-cluster-name", "default", "name of the default apisix cluster") + cmd.PersistentFlags().DurationVar(&cfg.ApisixResourceSyncInterval.Duration, "apisix-resource-sync-interval", 300*time.Second, "interval between syncs in seconds. Default value is 300s.") if err := cmd.PersistentFlags().MarkDeprecated("app-namespace", "use namespace-selector instead"); err != nil { dief("failed to mark `app-namespace` as deprecated: %s", err) diff --git a/conf/config-default.yaml b/conf/config-default.yaml index 5e05390a32..55327366f2 100644 --- a/conf/config-default.yaml +++ b/conf/config-default.yaml @@ -44,7 +44,7 @@ ingress_status_address: [] # when there is no available information on the Ser # For example, no available LB exists in the bare metal environment. enable_profiling: true # enable profiling via web interfaces # host:port/debug/pprof, default is true. - +apisix-resource-sync-interval: "300s" # Default interval for synchronizing Kubernetes resources to APISIX # Kubernetes related configurations. kubernetes: kubeconfig: "" # the Kubernetes configuration file path, default is diff --git a/pkg/config/config.go b/pkg/config/config.go index af6d36650e..d5f6f3501e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -72,17 +72,18 @@ const ( // Config contains all config items which are necessary for // apisix-ingress-controller's running. type Config struct { - CertFilePath string `json:"cert_file" yaml:"cert_file"` - KeyFilePath string `json:"key_file" yaml:"key_file"` - LogLevel string `json:"log_level" yaml:"log_level"` - LogOutput string `json:"log_output" yaml:"log_output"` - HTTPListen string `json:"http_listen" yaml:"http_listen"` - HTTPSListen string `json:"https_listen" yaml:"https_listen"` - IngressPublishService string `json:"ingress_publish_service" yaml:"ingress_publish_service"` - IngressStatusAddress []string `json:"ingress_status_address" yaml:"ingress_status_address"` - EnableProfiling bool `json:"enable_profiling" yaml:"enable_profiling"` - Kubernetes KubernetesConfig `json:"kubernetes" yaml:"kubernetes"` - APISIX APISIXConfig `json:"apisix" yaml:"apisix"` + CertFilePath string `json:"cert_file" yaml:"cert_file"` + KeyFilePath string `json:"key_file" yaml:"key_file"` + LogLevel string `json:"log_level" yaml:"log_level"` + LogOutput string `json:"log_output" yaml:"log_output"` + HTTPListen string `json:"http_listen" yaml:"http_listen"` + HTTPSListen string `json:"https_listen" yaml:"https_listen"` + IngressPublishService string `json:"ingress_publish_service" yaml:"ingress_publish_service"` + IngressStatusAddress []string `json:"ingress_status_address" yaml:"ingress_status_address"` + EnableProfiling bool `json:"enable_profiling" yaml:"enable_profiling"` + Kubernetes KubernetesConfig `json:"kubernetes" yaml:"kubernetes"` + APISIX APISIXConfig `json:"apisix" yaml:"apisix"` + ApisixResourceSyncInterval types.TimeDuration `json:"apisix-resource-sync-interval" yaml:"apisix-resource-sync-interval"` } // KubernetesConfig contains all Kubernetes related config items. @@ -118,15 +119,16 @@ type APISIXConfig struct { // default value. func NewDefaultConfig() *Config { return &Config{ - LogLevel: "warn", - LogOutput: "stderr", - HTTPListen: ":8080", - HTTPSListen: ":8443", - IngressPublishService: "", - IngressStatusAddress: []string{}, - CertFilePath: "/etc/webhook/certs/cert.pem", - KeyFilePath: "/etc/webhook/certs/key.pem", - EnableProfiling: true, + LogLevel: "warn", + LogOutput: "stderr", + HTTPListen: ":8080", + HTTPSListen: ":8443", + IngressPublishService: "", + IngressStatusAddress: []string{}, + CertFilePath: "/etc/webhook/certs/cert.pem", + KeyFilePath: "/etc/webhook/certs/key.pem", + EnableProfiling: true, + ApisixResourceSyncInterval: types.TimeDuration{Duration: 300 * time.Second}, Kubernetes: KubernetesConfig{ Kubeconfig: "", // Use in-cluster configurations. ResyncInterval: types.TimeDuration{Duration: 6 * time.Hour}, diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index aa0a30770c..cdca004a8a 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -28,15 +28,16 @@ import ( func TestNewConfigFromFile(t *testing.T) { cfg := &Config{ - LogLevel: "warn", - LogOutput: "stdout", - HTTPListen: ":9090", - HTTPSListen: ":9443", - IngressPublishService: "", - IngressStatusAddress: []string{}, - CertFilePath: "/etc/webhook/certs/cert.pem", - KeyFilePath: "/etc/webhook/certs/key.pem", - EnableProfiling: true, + LogLevel: "warn", + LogOutput: "stdout", + HTTPListen: ":9090", + HTTPSListen: ":9443", + IngressPublishService: "", + IngressStatusAddress: []string{}, + CertFilePath: "/etc/webhook/certs/cert.pem", + KeyFilePath: "/etc/webhook/certs/key.pem", + EnableProfiling: true, + ApisixResourceSyncInterval: types.TimeDuration{Duration: 200 * time.Second}, Kubernetes: KubernetesConfig{ ResyncInterval: types.TimeDuration{Duration: time.Hour}, Kubeconfig: "/path/to/foo/baz", @@ -86,6 +87,7 @@ https_listen: :9443 ingress_publish_service: "" ingress_status_address: [] enable_profiling: true +apisix-resource-sync-interval: 200s kubernetes: kubeconfig: /path/to/foo/baz resync_interval: 1h0m0s @@ -113,15 +115,16 @@ apisix: func TestConfigWithEnvVar(t *testing.T) { cfg := &Config{ - LogLevel: "warn", - LogOutput: "stdout", - HTTPListen: ":9090", - HTTPSListen: ":9443", - IngressPublishService: "", - IngressStatusAddress: []string{}, - CertFilePath: "/etc/webhook/certs/cert.pem", - KeyFilePath: "/etc/webhook/certs/key.pem", - EnableProfiling: true, + LogLevel: "warn", + LogOutput: "stdout", + HTTPListen: ":9090", + HTTPSListen: ":9443", + IngressPublishService: "", + IngressStatusAddress: []string{}, + CertFilePath: "/etc/webhook/certs/cert.pem", + KeyFilePath: "/etc/webhook/certs/key.pem", + EnableProfiling: true, + ApisixResourceSyncInterval: types.TimeDuration{Duration: 200 * time.Second}, Kubernetes: KubernetesConfig{ ResyncInterval: types.TimeDuration{Duration: time.Hour}, Kubeconfig: "", @@ -160,6 +163,7 @@ func TestConfigWithEnvVar(t *testing.T) { "ingress_publish_service": "", "ingress_status_address": [], "enable_profiling": true, + "apisix-resource-sync-interval": "200s", "kubernetes": { "kubeconfig": "{{.KUBECONFIG}}", "resync_interval": "1h0m0s", @@ -195,6 +199,7 @@ https_listen: :9443 ingress_publish_service: "" ingress_status_address: [] enable_profiling: true +apisix-resource-sync-interval: 200s kubernetes: resync_interval: 1h0m0s kubeconfig: "{{.KUBECONFIG}}" diff --git a/pkg/ingress/apisix_cluster_config.go b/pkg/ingress/apisix_cluster_config.go index ecc33d0e6d..fa34638126 100644 --- a/pkg/ingress/apisix_cluster_config.go +++ b/pkg/ingress/apisix_cluster_config.go @@ -403,3 +403,29 @@ func (c *apisixClusterConfigController) onDelete(obj interface{}) { c.controller.MetricsCollector.IncrEvents("clusterConfig", "delete") } + +func (c *apisixClusterConfigController) ResourceSync() { + objs := c.controller.apisixClusterConfigInformer.GetIndexer().List() + for _, obj := range objs { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorw("ApisixClusterConfig sync failed, found ApisixClusterConfig resource with bad meta namespace key", zap.String("error", err.Error())) + continue + } + if !c.controller.isWatchingNamespace(key) { + continue + } + acc, err := kube.NewApisixClusterConfig(obj) + if err != nil { + log.Errorw("found ApisixClusterConfig resource with bad type", zap.String("error", err.Error())) + return + } + c.workqueue.Add(&types.Event{ + Type: types.EventAdd, + Object: kube.ApisixClusterConfigEvent{ + Key: key, + GroupVersion: acc.GroupVersion(), + }, + }) + } +} diff --git a/pkg/ingress/apisix_consumer.go b/pkg/ingress/apisix_consumer.go index 8a456a7055..581239dd7b 100644 --- a/pkg/ingress/apisix_consumer.go +++ b/pkg/ingress/apisix_consumer.go @@ -318,3 +318,29 @@ func (c *apisixConsumerController) onDelete(obj interface{}) { c.controller.MetricsCollector.IncrEvents("consumer", "delete") } + +func (c *apisixConsumerController) ResourceSync() { + objs := c.controller.apisixConsumerInformer.GetIndexer().List() + for _, obj := range objs { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorw("ApisixConsumer sync failed, found ApisixConsumer resource with bad meta namespace key", zap.String("error", err.Error())) + continue + } + if !c.controller.isWatchingNamespace(key) { + continue + } + ac, err := kube.NewApisixConsumer(obj) + if err != nil { + log.Errorw("found ApisixConsumer resource with bad type", zap.String("error", err.Error())) + return + } + c.workqueue.Add(&types.Event{ + Type: types.EventAdd, + Object: kube.ApisixConsumerEvent{ + Key: key, + GroupVersion: ac.GroupVersion(), + }, + }) + } +} diff --git a/pkg/ingress/apisix_pluginconfig.go b/pkg/ingress/apisix_pluginconfig.go index 3faf885bcf..fa7e64a778 100644 --- a/pkg/ingress/apisix_pluginconfig.go +++ b/pkg/ingress/apisix_pluginconfig.go @@ -367,3 +367,25 @@ func (c *apisixPluginConfigController) onDelete(obj interface{}) { c.controller.MetricsCollector.IncrEvents("PluginConfig", "delete") } + +func (c *apisixPluginConfigController) ResourceSync() { + objs := c.controller.apisixPluginConfigInformer.GetIndexer().List() + for _, obj := range objs { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorw("ApisixPluginConfig sync failed, found ApisixPluginConfig resource with bad meta namespace key", zap.String("error", err.Error())) + continue + } + if !c.controller.isWatchingNamespace(key) { + continue + } + apc := kube.MustNewApisixPluginConfig(obj) + c.workqueue.Add(&types.Event{ + Type: types.EventAdd, + Object: kube.ApisixPluginConfigEvent{ + Key: key, + GroupVersion: apc.GroupVersion(), + }, + }) + } +} diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go index c2ecee339b..a2f9537477 100644 --- a/pkg/ingress/apisix_route.go +++ b/pkg/ingress/apisix_route.go @@ -449,3 +449,25 @@ func (c *apisixRouteController) onDelete(obj interface{}) { c.controller.MetricsCollector.IncrEvents("route", "delete") } + +func (c *apisixRouteController) ResourceSync() { + objs := c.controller.apisixRouteInformer.GetIndexer().List() + for _, obj := range objs { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorw("ApisixRoute sync failed, found ApisixRoute resource with bad meta namespace key", zap.String("error", err.Error())) + continue + } + if !c.controller.isWatchingNamespace(key) { + continue + } + ar := kube.MustNewApisixRoute(obj) + c.workqueue.Add(&types.Event{ + Type: types.EventAdd, + Object: kube.ApisixRouteEvent{ + Key: key, + GroupVersion: ar.GroupVersion(), + }, + }) + } +} diff --git a/pkg/ingress/apisix_tls.go b/pkg/ingress/apisix_tls.go index 66bc844254..be484edce0 100644 --- a/pkg/ingress/apisix_tls.go +++ b/pkg/ingress/apisix_tls.go @@ -359,3 +359,29 @@ func (c *apisixTlsController) onDelete(obj interface{}) { c.controller.MetricsCollector.IncrEvents("TLS", "delete") } + +func (c *apisixTlsController) ResourceSync() { + objs := c.controller.apisixTlsInformer.GetIndexer().List() + for _, obj := range objs { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorw("ApisixTls sync failed, found ApisixTls object with bad namespace/name ignore it", zap.String("error", err.Error())) + continue + } + if !c.controller.isWatchingNamespace(key) { + continue + } + tls, err := kube.NewApisixTls(obj) + if err != nil { + log.Errorw("ApisixTls sync failed, found ApisixTls resource with bad type", zap.Error(err)) + continue + } + c.workqueue.Add(&types.Event{ + Type: types.EventAdd, + Object: kube.ApisixTlsEvent{ + Key: key, + GroupVersion: tls.GroupVersion(), + }, + }) + } +} diff --git a/pkg/ingress/apisix_upstream.go b/pkg/ingress/apisix_upstream.go index ca8829708f..a82f5df847 100644 --- a/pkg/ingress/apisix_upstream.go +++ b/pkg/ingress/apisix_upstream.go @@ -301,3 +301,21 @@ func (c *apisixUpstreamController) onDelete(obj interface{}) { c.controller.MetricsCollector.IncrEvents("upstream", "delete") } + +func (c *apisixUpstreamController) ResourceSync() { + clusterConfigs := c.controller.apisixUpstreamInformer.GetIndexer().List() + for _, clusterConfig := range clusterConfigs { + key, err := cache.MetaNamespaceKeyFunc(clusterConfig) + if err != nil { + log.Errorw("ApisixUpstream sync failed, found ApisixUpstream resource with bad meta namespace key", zap.String("error", err.Error())) + continue + } + if !c.controller.isWatchingNamespace(key) { + continue + } + c.workqueue.Add(&types.Event{ + Type: types.EventAdd, + Object: key, + }) + } +} diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go index 80c36e342b..f6451a8e32 100644 --- a/pkg/ingress/controller.go +++ b/pkg/ingress/controller.go @@ -64,6 +64,8 @@ const ( _resourceSyncAborted = "ResourceSyncAborted" // _messageResourceFailed is used to report error _messageResourceFailed = "%s synced failed, with error: %s" + // minimum interval for ingress sync to APISIX + _mininumApisixResourceSyncInterval = 60 * time.Second ) // Controller is the ingress apisix controller object. @@ -399,7 +401,6 @@ func (c *Controller) Run(stop chan struct{}) error { ReleaseOnCancel: true, Name: "ingress-apisix", } - elector, err := leaderelection.NewLeaderElector(cfg) if err != nil { log.Errorf("failed to create leader elector: %s", err.Error()) @@ -569,6 +570,9 @@ func (c *Controller) run(ctx context.Context) { c.apisixPluginConfigController.run(ctx) }) + c.goAttach(func() { + c.resourceSyncLoop(ctx, c.cfg.ApisixResourceSyncInterval.Duration) + }) c.MetricsCollector.ResetLeader(true) log.Infow("controller now is running as leader", @@ -727,3 +731,57 @@ func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context. c.MetricsCollector.IncrCheckClusterHealth(c.name) } } + +func (c *Controller) syncAllResources() { + wg := sync.WaitGroup{} + goAttach := func(handler func()) { + wg.Add(1) + go func() { + defer wg.Done() + handler() + }() + } + goAttach(func() { + c.apisixConsumerController.ResourceSync() + }) + goAttach(func() { + c.apisixRouteController.ResourceSync() + }) + goAttach(func() { + c.apisixClusterConfigController.ResourceSync() + }) + goAttach(func() { + c.apisixPluginConfigController.ResourceSync() + }) + goAttach(func() { + c.apisixUpstreamController.ResourceSync() + }) + goAttach(func() { + c.apisixTlsController.ResourceSync() + }) + goAttach(func() { + c.ingressController.ResourceSync() + }) + wg.Wait() +} + +func (c *Controller) resourceSyncLoop(ctx context.Context, interval time.Duration) { + // The interval shall not be less than 60 seconds. + if interval < _mininumApisixResourceSyncInterval { + log.Warnw("The apisix-resource-sync-interval shall not be less than 60 seconds.", + zap.String("apisix-resource-sync-interval", interval.String()), + ) + interval = _mininumApisixResourceSyncInterval + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + c.syncAllResources() + continue + case <-ctx.Done(): + return + } + } +} diff --git a/pkg/ingress/ingress.go b/pkg/ingress/ingress.go index 8e5c0d1ebb..30053f0441 100644 --- a/pkg/ingress/ingress.go +++ b/pkg/ingress/ingress.go @@ -403,3 +403,25 @@ func (c *ingressController) isIngressEffective(ing kube.Ingress) bool { } return false } + +func (c *ingressController) ResourceSync() { + objs := c.controller.ingressInformer.GetIndexer().List() + for _, obj := range objs { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + log.Errorw("found ApisixConsumer resource with bad meta namespace key", zap.String("error", err.Error())) + continue + } + if !c.controller.isWatchingNamespace(key) { + continue + } + ing := kube.MustNewIngress(obj) + c.workqueue.Add(&types.Event{ + Type: types.EventAdd, + Object: kube.IngressEvent{ + Key: key, + GroupVersion: ing.GroupVersion(), + }, + }) + } +} diff --git a/samples/deploy/configmap/apisix-ingress-cm.yaml b/samples/deploy/configmap/apisix-ingress-cm.yaml index 5c98ff8fb3..edcc32b096 100644 --- a/samples/deploy/configmap/apisix-ingress-cm.yaml +++ b/samples/deploy/configmap/apisix-ingress-cm.yaml @@ -35,7 +35,7 @@ data: http_listen: ":8080" # the HTTP Server listen address, default is ":8080" enable_profiling: true # enable profiling via web interfaces # host:port/debug/pprof, default is true. - + apisix-resource-sync-interval: 300s # Default interval for synchronizing Kubernetes resources to APISIX # Kubernetes related configurations. kubernetes: kubeconfig: "" # the Kubernetes configuration file path, default is diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go index cef850df23..13cd350b31 100644 --- a/test/e2e/scaffold/ingress.go +++ b/test/e2e/scaffold/ingress.go @@ -277,6 +277,8 @@ spec: - debug - --log-output - stdout + - --apisix-resource-sync-interval + - %s - --http-listen - :8080 - --https-listen @@ -417,10 +419,10 @@ func (s *Scaffold) newIngressAPISIXController() error { var ingressAPISIXDeployment string label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace) if s.opts.EnableWebhooks { - ingressAPISIXDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), s.opts.IngressAPISIXReplicas, s.namespace, + ingressAPISIXDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), s.opts.IngressAPISIXReplicas, s.namespace, s.opts.ApisixResourceSyncInterval, s.FormatNamespaceLabel(label), s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, s.opts.EnableGatewayAPI, _volumeMounts, _webhookCertSecret) } else { - ingressAPISIXDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), s.opts.IngressAPISIXReplicas, s.namespace, + ingressAPISIXDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), s.opts.IngressAPISIXReplicas, s.namespace, s.opts.ApisixResourceSyncInterval, s.FormatNamespaceLabel(label), s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, s.opts.EnableGatewayAPI, "", _webhookCertSecret) } @@ -527,9 +529,10 @@ func (s *Scaffold) ScaleIngressController(desired int) error { var ingressDeployment string label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace) if s.opts.EnableWebhooks { - ingressDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired, s.namespace, label, s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, s.opts.EnableGatewayAPI, _volumeMounts, _webhookCertSecret) + + ingressDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired, s.namespace, s.opts.ApisixResourceSyncInterval, label, s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, s.opts.EnableGatewayAPI, _volumeMounts, _webhookCertSecret) } else { - ingressDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired, s.namespace, label, s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, s.opts.EnableGatewayAPI, "", _webhookCertSecret) + ingressDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired, s.namespace, s.opts.ApisixResourceSyncInterval, label, s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, s.opts.EnableGatewayAPI, "", _webhookCertSecret) } if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, ingressDeployment); err != nil { return err diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go index b788c92f5a..e9c6cbb1fa 100644 --- a/test/e2e/scaffold/k8s.go +++ b/test/e2e/scaffold/k8s.go @@ -252,7 +252,7 @@ func (s *Scaffold) EnsureNumApisixPluginConfigCreated(desired int) error { return s.ensureNumApisixCRDsCreated(u.String(), desired) } -// CreateApisixRouteByApisixAdmin create a route +// CreateApisixRouteByApisixAdmin create or update a route func (s *Scaffold) CreateApisixRouteByApisixAdmin(routeID string, body []byte) error { u := url.URL{ Scheme: "http", @@ -262,6 +262,16 @@ func (s *Scaffold) CreateApisixRouteByApisixAdmin(routeID string, body []byte) e return s.ensureAdminOperationIsSuccessful(u.String(), "PUT", body) } +// CreateApisixRouteByApisixAdmin create or update a consumer +func (s *Scaffold) CreateApisixConsumerByApisixAdmin(body []byte) error { + u := url.URL{ + Scheme: "http", + Host: s.apisixAdminTunnel.Endpoint(), + Path: "/apisix/admin/consumers", + } + return s.ensureAdminOperationIsSuccessful(u.String(), "PUT", body) +} + // DeleteApisixRouteByApisixAdmin deletes a route by its route name in APISIX cluster. func (s *Scaffold) DeleteApisixRouteByApisixAdmin(routeID string) error { u := url.URL{ @@ -272,6 +282,16 @@ func (s *Scaffold) DeleteApisixRouteByApisixAdmin(routeID string) error { return s.ensureAdminOperationIsSuccessful(u.String(), "DELETE", nil) } +// DeleteApisixConsumerByApisixAdmin deletes a consumer by its consumer name in APISIX cluster. +func (s *Scaffold) DeleteApisixConsumerByApisixAdmin(consumerName string) error { + u := url.URL{ + Scheme: "http", + Host: s.apisixAdminTunnel.Endpoint(), + Path: "/apisix/admin/consumers/" + consumerName, + } + return s.ensureAdminOperationIsSuccessful(u.String(), "DELETE", nil) +} + func (s *Scaffold) ensureAdminOperationIsSuccessful(url, method string, body []byte) error { condFunc := func() (bool, error) { req, err := http.NewRequest(method, url, bytes.NewBuffer(body)) diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go index 01d4f50c1a..ee29fcc516 100644 --- a/test/e2e/scaffold/scaffold.go +++ b/test/e2e/scaffold/scaffold.go @@ -61,6 +61,7 @@ type Options struct { EnableWebhooks bool APISIXPublishAddress string disableNamespaceSelector bool + ApisixResourceSyncInterval string EnableGatewayAPI bool } @@ -127,6 +128,9 @@ func NewScaffold(o *Options) *Scaffold { if o.APISIXAdminAPIKey == "" { o.APISIXAdminAPIKey = "edd1c9f034335f136f87ad84b625c8f1" } + if o.ApisixResourceSyncInterval == "" { + o.ApisixResourceSyncInterval = "300s" + } defer ginkgo.GinkgoRecover() s := &Scaffold{ diff --git a/test/e2e/suite-ingress/resourcesync.go b/test/e2e/suite-ingress/resourcesync.go new file mode 100644 index 0000000000..559c36fe9b --- /dev/null +++ b/test/e2e/suite-ingress/resourcesync.go @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package ingress + +import ( + "encoding/json" + "fmt" + "net/http" + "time" + + ginkgo "github.com/onsi/ginkgo/v2" + "github.com/stretchr/testify/assert" + + "github.com/apache/apisix-ingress-controller/pkg/id" + "github.com/apache/apisix-ingress-controller/test/e2e/scaffold" +) + +var _ = ginkgo.Describe("suite-ingress: apisix resource sync", func() { + opts := &scaffold.Options{ + Name: "default", + Kubeconfig: scaffold.GetKubeconfig(), + APISIXConfigPath: "testdata/apisix-gw-config.yaml", + IngressAPISIXReplicas: 1, + HTTPBinServicePort: 80, + APISIXRouteVersion: "apisix.apache.org/v2beta3", + ApisixResourceSyncInterval: "60s", + } + s := scaffold.NewScaffold(opts) + ginkgo.JustBeforeEach(func() { + backendSvc, backendPorts := s.DefaultHTTPBackend() + // Create ApisixRoute resource + ar := fmt.Sprintf(` +apiVersion: apisix.apache.org/v2beta3 +kind: ApisixRoute +metadata: + name: httpbin-route +spec: + http: + - name: rule1 + match: + hosts: + - httpbin.org + paths: + - /ip + backends: + - serviceName: %s + servicePort: %d + authentication: + enable: true + type: keyAuth +`, backendSvc, backendPorts[0]) + assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ar)) + err := s.EnsureNumApisixUpstreamsCreated(1) + assert.Nil(ginkgo.GinkgoT(), err, "Checking number of upstreams") + err = s.EnsureNumApisixRoutesCreated(1) + assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes") + + // Create Ingress resource + ing := fmt.Sprintf(` +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + annotations: + kubernetes.io/ingress.class: apisix + name: ingress-route +spec: + rules: + - host: local.httpbin.org + http: + paths: + - path: /headers + pathType: Exact + backend: + service: + name: %s + port: + number: %d +`, backendSvc, backendPorts[0]) + assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ing)) + + // Create ApisixConsumer resource + err = s.ApisixConsumerKeyAuthCreated("foo", "foo-key") + assert.Nil(ginkgo.GinkgoT(), err) + }) + + ginkgo.It("for modified resource sync consistency", func() { + // crd resource sync interval + readyTime := time.Now().Add(60 * time.Second) + + routes, _ := s.ListApisixRoutes() + assert.Len(ginkgo.GinkgoT(), routes, 2) + + consumers, _ := s.ListApisixConsumers() + assert.Len(ginkgo.GinkgoT(), consumers, 1) + + for _, route := range routes { + _ = s.CreateApisixRouteByApisixAdmin(id.GenID(route.Name), []byte(` +{ + "methods": ["GET"], + "uri": "/anything", + "plugins": { + "key-auth": {} + }, + "upstream": { + "type": "roundrobin", + "nodes": { + "httpbin.org": 1 + } + } +}`)) + } + + for _, consumer := range consumers { + _ = s.CreateApisixConsumerByApisixAdmin([]byte(fmt.Sprintf(` +{ + "username": "%s", + "plugins": { + "key-auth": { + "key": "auth-one" + } + } +}`, consumer.Username))) + } + + _ = s.NewAPISIXClient(). + GET("/ip"). + WithHeader("Host", "httpbin.org"). + Expect(). + Status(http.StatusNotFound) + + _ = s.NewAPISIXClient(). + GET("/headers"). + WithHeader("Host", "local.httpbin.org"). + Expect(). + Status(http.StatusNotFound) + + waitTime := time.Until(readyTime).Seconds() + time.Sleep(time.Duration(waitTime) * time.Second) + + _ = s.NewAPISIXClient(). + GET("/ip"). + WithHeader("Host", "httpbin.org"). + WithHeader("apikey", "foo-key"). + Expect(). + Status(http.StatusOK) + + _ = s.NewAPISIXClient(). + GET("/headers"). + WithHeader("Host", "local.httpbin.org"). + Expect(). + Status(http.StatusOK) + + consumers, _ = s.ListApisixConsumers() + assert.Len(ginkgo.GinkgoT(), consumers, 1) + data, _ := json.Marshal(consumers[0]) + assert.Contains(ginkgo.GinkgoT(), string(data), "foo-key") + }) + + ginkgo.It("for deleted resource sync consistency", func() { + // crd resource sync interval + readyTime := time.Now().Add(60 * time.Second) + + routes, _ := s.ListApisixRoutes() + assert.Len(ginkgo.GinkgoT(), routes, 2) + + consumers, _ := s.ListApisixConsumers() + assert.Len(ginkgo.GinkgoT(), consumers, 1) + + for _, route := range routes { + _ = s.DeleteApisixRouteByApisixAdmin(id.GenID(route.Name)) + } + + for _, consumer := range consumers { + s.DeleteApisixConsumerByApisixAdmin(consumer.Username) + } + + _ = s.NewAPISIXClient(). + GET("/ip"). + WithHeader("Host", "httpbin.org"). + Expect(). + Status(http.StatusNotFound) + + _ = s.NewAPISIXClient(). + GET("/headers"). + WithHeader("Host", "local.httpbin.org"). + Expect(). + Status(http.StatusNotFound) + + routes, _ = s.ListApisixRoutes() + assert.Len(ginkgo.GinkgoT(), routes, 0) + consumers, _ = s.ListApisixConsumers() + assert.Len(ginkgo.GinkgoT(), consumers, 0) + + waitTime := time.Until(readyTime).Seconds() + time.Sleep(time.Duration(waitTime) * time.Second) + + _ = s.NewAPISIXClient(). + GET("/ip"). + WithHeader("Host", "httpbin.org"). + WithHeader("apikey", "foo-key"). + Expect(). + Status(http.StatusOK) + + _ = s.NewAPISIXClient(). + GET("/headers"). + WithHeader("Host", "local.httpbin.org"). + Expect(). + Status(http.StatusOK) + + consumers, _ = s.ListApisixConsumers() + assert.Len(ginkgo.GinkgoT(), consumers, 1) + data, _ := json.Marshal(consumers[0]) + assert.Contains(ginkgo.GinkgoT(), string(data), "foo-key") + }) +})