Skip to content

Commit

Permalink
feat: add label-selector for watching namespace (#715)
Browse files Browse the repository at this point in the history
  • Loading branch information
gxthrj committed Oct 28, 2021
1 parent dc196ef commit 65f7c88
Show file tree
Hide file tree
Showing 10 changed files with 294 additions and 34 deletions.
6 changes: 5 additions & 1 deletion cmd/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ the apisix cluster and others are created`,
cmd.PersistentFlags().BoolVar(&cfg.EnableProfiling, "enable-profiling", true, "enable profiling via web interface host:port/debug/pprof")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.Kubeconfig, "kubeconfig", "", "Kubernetes configuration file (by default in-cluster configuration will be used)")
cmd.PersistentFlags().DurationVar(&cfg.Kubernetes.ResyncInterval.Duration, "resync-interval", time.Minute, "the controller resync (with Kubernetes) interval, the minimum resync interval is 30s")
cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.AppNamespaces, "app-namespace", []string{config.NamespaceAll}, "namespaces that controller will watch for resources")
cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.AppNamespaces, "app-namespace", []string{config.NamespaceAll}, "namespaces that controller will watch for resources.")
cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.NamespaceSelector, "namespace-selector", []string{""}, "labels that controller used to select namespaces which will watch for resources")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressClass, "ingress-class", config.IngressClass, "the class of an Ingress object is set using the field IngressClassName in Kubernetes clusters version v1.18.0 or higher or the annotation \"kubernetes.io/ingress.class\" (deprecated)")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, "election-id", config.IngressAPISIXLeader, "election id used for campaign the controller leader")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressVersion, "ingress-version", config.IngressNetworkingV1, "the supported ingress api group version, can be \"networking/v1beta1\", \"networking/v1\" (for Kubernetes version v1.19.0 or higher) and \"extensions/v1beta1\"")
Expand All @@ -155,5 +156,8 @@ the apisix cluster and others are created`,
cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterAdminKey, "default-apisix-cluster-admin-key", "", "admin key used for the authorization of admin api / manager api for the default APISIX cluster")
cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterName, "default-apisix-cluster-name", "default", "name of the default apisix cluster")

if err := cmd.PersistentFlags().MarkDeprecated("app-namespace", "use namespace-selector instead"); err != nil {
dief("failed to mark `app-namespace` as deprecated: %s", err)
}
return cmd
}
4 changes: 4 additions & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ kubernetes:
# and the minimal resync interval is 30s.
app_namespaces: ["*"] # namespace list that controller will watch for resources,
# by default all namespaces (represented by "*") are watched.
# The `app_namespace` is deprecated, using `namespace_selector` instead since version 1.4.0
namespace_selector: [""] # namespace_selector represent basis for selecting managed namespaces.
# the field is support since version 1.4.0
# For example, "apisix.ingress=watching", so ingress will watching the namespaces which labels "apisix.ingress=watching"
election_id: "ingress-apisix-leader" # the election id for the controller leader campaign,
# only the leader will watch and delivery resource changes,
# other instances (as candidates) stand by.
Expand Down
4 changes: 2 additions & 2 deletions docs/en/latest/practices/the-hard-way.md
Original file line number Diff line number Diff line change
Expand Up @@ -621,8 +621,8 @@ data:
kubernetes:
kubeconfig: ""
resync_interval: "30s"
app_namespaces:
- "*"
namespace_selector:
- "apisix.ingress=watching"
ingress_class: "apisix"
ingress_version: "networking/v1"
apisix_route_version: "apisix.apache.org/v2beta1"
Expand Down
52 changes: 52 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package config
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"strings"
"time"

"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/validation"

"github.com/apache/apisix-ingress-controller/pkg/types"
)
Expand Down Expand Up @@ -77,6 +79,7 @@ type KubernetesConfig struct {
Kubeconfig string `json:"kubeconfig" yaml:"kubeconfig"`
ResyncInterval types.TimeDuration `json:"resync_interval" yaml:"resync_interval"`
AppNamespaces []string `json:"app_namespaces" yaml:"app_namespaces"`
NamespaceSelector []string `json:"namespace_selector" yaml:"namespace_selector"`
ElectionID string `json:"election_id" yaml:"election_id"`
IngressClass string `json:"ingress_class" yaml:"ingress_class"`
IngressVersion string `json:"ingress_version" yaml:"ingress_version"`
Expand Down Expand Up @@ -174,6 +177,10 @@ func (cfg *Config) Validate() error {
return errors.New("unsupported ingress version")
}
cfg.Kubernetes.AppNamespaces = purifyAppNamespaces(cfg.Kubernetes.AppNamespaces)
ok, err := cfg.verifyNamespaceSelector()
if !ok {
return err
}
return nil
}

Expand All @@ -191,3 +198,48 @@ func purifyAppNamespaces(namespaces []string) []string {
}
return ultimate
}

func (cfg *Config) verifyNamespaceSelector() (bool, error) {
labels := cfg.Kubernetes.NamespaceSelector
// default is [""]
if len(labels) == 1 && labels[0] == "" {
cfg.Kubernetes.NamespaceSelector = []string{}
}

for _, s := range cfg.Kubernetes.NamespaceSelector {
parts := strings.Split(s, "=")
if len(parts) != 2 {
return false, fmt.Errorf("Illegal namespaceSelector: %s, should be key-value pairs divided by = ", s)
} else {
if err := cfg.validateLabelKey(parts[0]); err != nil {
return false, err
}
if err := cfg.validateLabelValue(parts[1]); err != nil {
return false, err
}
}
}
return true, nil
}

// validateLabelKey validate the key part of label
// ref: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
func (cfg *Config) validateLabelKey(key string) error {
errorMsg := validation.IsQualifiedName(key)
msg := strings.Join(errorMsg, "\n")
if msg == "" {
return nil
}
return fmt.Errorf("Illegal namespaceSelector: %s, "+msg, key)
}

// validateLabelValue validate the value part of label
// ref: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
func (cfg *Config) validateLabelValue(value string) error {
errorMsg := validation.IsValidLabelValue(value)
msg := strings.Join(errorMsg, "\n")
if msg == "" {
return nil
}
return fmt.Errorf("Illegal namespaceSelector: %s, "+msg, value)
}
16 changes: 8 additions & 8 deletions pkg/ingress/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,16 @@ func (c *Controller) CompareResources(ctx context.Context) error {
log.Error(err.Error())
ctx.Done()
} else {
wns := make(map[string]struct{}, len(nsList.Items))
wns := new(sync.Map)
for _, v := range nsList.Items {
wns[v.Name] = struct{}{}
wns.Store(v.Name, struct{}{})
}
c.watchingNamespace = wns
}
}
if len(c.watchingNamespace) > 0 {
wg.Add(len(c.watchingNamespace))
}
for ns := range c.watchingNamespace {

c.watchingNamespace.Range(func(key, value interface{}) bool {
wg.Add(1)
go func(ns string) {
defer wg.Done()
// ApisixRoute
Expand Down Expand Up @@ -130,8 +129,9 @@ func (c *Controller) CompareResources(ctx context.Context) error {
}
}
}
}(ns)
}
}(key.(string))
return true
})
wg.Wait()

// 2.get all cache routes
Expand Down
36 changes: 31 additions & 5 deletions pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -70,7 +71,8 @@ type Controller struct {
namespace string
cfg *config.Config
wg sync.WaitGroup
watchingNamespace map[string]struct{}
watchingNamespace *sync.Map
watchingLabels types.Labels
apisix apisix.APISIX
podCache types.PodCache
translator translation.Translator
Expand All @@ -90,6 +92,8 @@ type Controller struct {
leaderContextCancelFunc context.CancelFunc

// common informers and listers
namespaceInformer cache.SharedIndexInformer
namespaceLister listerscorev1.NamespaceLister
podInformer cache.SharedIndexInformer
podLister listerscorev1.PodLister
epInformer cache.SharedIndexInformer
Expand All @@ -112,6 +116,7 @@ type Controller struct {
apisixConsumerLister listersv2alpha1.ApisixConsumerLister

// resource controllers
namespaceController *namespaceController
podController *podController
endpointsController *endpointsController
endpointSliceController *endpointSliceController
Expand Down Expand Up @@ -148,15 +153,21 @@ func NewController(cfg *config.Config) (*Controller, error) {
}

var (
watchingNamespace map[string]struct{}
watchingNamespace = new(sync.Map)
watchingLabels = make(map[string]string)
)
if len(cfg.Kubernetes.AppNamespaces) > 1 || cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll {
watchingNamespace = make(map[string]struct{}, len(cfg.Kubernetes.AppNamespaces))
for _, ns := range cfg.Kubernetes.AppNamespaces {
watchingNamespace[ns] = struct{}{}
watchingNamespace.Store(ns, struct{}{})
}
}

// support namespace label-selector
for _, labels := range cfg.Kubernetes.NamespaceSelector {
labelSlice := strings.Split(labels, "=")
watchingLabels[labelSlice[0]] = labelSlice[1]
}

// recorder
utilruntime.Must(apisixscheme.AddToScheme(scheme.Scheme))
eventBroadcaster := record.NewBroadcaster()
Expand All @@ -171,6 +182,7 @@ func NewController(cfg *config.Config) (*Controller, error) {
metricsCollector: metrics.NewPrometheusCollector(),
kubeClient: kubeClient,
watchingNamespace: watchingNamespace,
watchingLabels: watchingLabels,
secretSSLMap: new(sync.Map),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),

Expand All @@ -188,6 +200,7 @@ func (c *Controller) initWhenStartLeading() {
kubeFactory := c.kubeClient.NewSharedIndexInformerFactory()
apisixFactory := c.kubeClient.NewAPISIXSharedIndexInformerFactory()

c.namespaceLister = kubeFactory.Core().V1().Namespaces().Lister()
c.podLister = kubeFactory.Core().V1().Pods().Lister()
c.epLister, c.epInformer = kube.NewEndpointListerAndInformer(kubeFactory, c.cfg.Kubernetes.WatchEndpointSlices)
c.svcLister = kubeFactory.Core().V1().Services().Lister()
Expand Down Expand Up @@ -236,6 +249,7 @@ func (c *Controller) initWhenStartLeading() {
apisixRouteInformer = apisixFactory.Apisix().V2beta2().ApisixRoutes().Informer()
}

c.namespaceInformer = kubeFactory.Core().V1().Namespaces().Informer()
c.podInformer = kubeFactory.Core().V1().Pods().Informer()
c.svcInformer = kubeFactory.Core().V1().Services().Informer()
c.ingressInformer = ingressInformer
Expand All @@ -251,6 +265,7 @@ func (c *Controller) initWhenStartLeading() {
} else {
c.endpointsController = c.newEndpointsController()
}
c.namespaceController = c.newNamespaceController()
c.podController = c.newPodController()
c.apisixUpstreamController = c.newApisixUpstreamController()
c.ingressController = c.newIngressController()
Expand Down Expand Up @@ -405,6 +420,11 @@ func (c *Controller) run(ctx context.Context) {

c.initWhenStartLeading()

// list namesapce and init watchingNamespace
if err := c.initWatchingNamespaceByLabels(ctx); err != nil {
ctx.Done()
return
}
// compare resources of k8s with objects of APISIX
if err = c.CompareResources(ctx); err != nil {
ctx.Done()
Expand All @@ -414,6 +434,9 @@ func (c *Controller) run(ctx context.Context) {
c.goAttach(func() {
c.checkClusterHealth(ctx, cancelFunc)
})
c.goAttach(func() {
c.namespaceInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.podInformer.Run(ctx.Done())
})
Expand Down Expand Up @@ -445,6 +468,9 @@ func (c *Controller) run(ctx context.Context) {
c.goAttach(func() {
c.apisixConsumerInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.namespaceController.run(ctx)
})
c.goAttach(func() {
c.podController.run(ctx)
})
Expand Down Expand Up @@ -502,7 +528,7 @@ func (c *Controller) namespaceWatching(key string) (ok bool) {
log.Warnf("resource %s was ignored since: %s", key, err)
return
}
_, ok = c.watchingNamespace[ns]
_, ok = c.watchingNamespace.Load(ns)
return
}

Expand Down
Loading

0 comments on commit 65f7c88

Please sign in to comment.