Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: namespace_selector invalid when restarting #1238

Merged
merged 8 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions pkg/api/validation/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,3 @@ func validateSchema(schemaLoader *gojsonschema.JSONLoader, obj interface{}) (boo

return false, resultErr
}

func HasValueInSyncMap(m *sync.Map) bool {
hasValue := false
if m != nil {
m.Range(func(k, v interface{}) bool {
hasValue = true
return false
})
}
return hasValue
}
9 changes: 0 additions & 9 deletions pkg/api/validation/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
package validation

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/xeipuuv/gojsonschema"

v2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
Expand Down Expand Up @@ -48,10 +46,3 @@ func Test_validateSchema(t *testing.T) {
})
}
}

func TestHasValueInSyncMap(t *testing.T) {
m := new(sync.Map)
assert.False(t, HasValueInSyncMap(m), "sync.Map should be empty")
m.Store("hello", "test")
assert.True(t, HasValueInSyncMap(m), "sync.Map should not be empty")
}
84 changes: 42 additions & 42 deletions pkg/providers/k8s/namespace/namespace_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package namespace

import (
"context"
"fmt"
"strings"
"sync"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

"github.com/apache/apisix-ingress-controller/pkg/api/validation"
"github.com/apache/apisix-ingress-controller/pkg/config"
"github.com/apache/apisix-ingress-controller/pkg/kube"
"github.com/apache/apisix-ingress-controller/pkg/log"
Expand All @@ -48,44 +47,28 @@ type WatchingNamespaceProvider interface {
}

func NewWatchingNamespaceProvider(ctx context.Context, kube *kube.KubeClient, cfg *config.Config) (WatchingNamespaceProvider, error) {
var (
watchingNamespaces = new(sync.Map)
watchingLabels = make(map[string]string)
)
if len(cfg.Kubernetes.AppNamespaces) > 1 || cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll {
for _, ns := range cfg.Kubernetes.AppNamespaces {
watchingNamespaces.Store(ns, struct{}{})
}
c := &watchingProvider{
kube: kube,
cfg: cfg,

watchingNamespaces: new(sync.Map),
watchingLabels: make(map[string]string),

enableLabelsWatching: false,
}

if len(cfg.Kubernetes.NamespaceSelector) == 0 {
return c, nil
}

// support namespace label-selector
c.enableLabelsWatching = true
for _, selector := range cfg.Kubernetes.NamespaceSelector {
labelSlice := strings.Split(selector, "=")
watchingLabels[labelSlice[0]] = labelSlice[1]
}

// watchingNamespaces and watchingLabels are empty means to monitor all namespaces.
if !validation.HasValueInSyncMap(watchingNamespaces) && len(watchingLabels) == 0 {
opts := metav1.ListOptions{}
// list all namespaces
nsList, err := kube.Client.CoreV1().Namespaces().List(ctx, opts)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
wns := new(sync.Map)
for _, v := range nsList.Items {
wns.Store(v.Name, struct{}{})
}
watchingNamespaces = wns
if len(labelSlice) != 2 {
return nil, fmt.Errorf("Bad namespace-selector format: %s, expected namespace-selector format: xxx=xxx", selector)
}
}

c := &watchingProvider{
kube: kube,
cfg: cfg,

watchingNamespaces: watchingNamespaces,
watchingLabels: watchingLabels,
c.watchingLabels[labelSlice[0]] = labelSlice[1]
AlinsRan marked this conversation as resolved.
Show resolved Hide resolved
}

kubeFactory := kube.NewSharedIndexInformerFactory()
Expand All @@ -108,6 +91,8 @@ type watchingProvider struct {
namespaceLister listerscorev1.NamespaceLister

controller *namespaceController

enableLabelsWatching bool
}

func (c *watchingProvider) Init(ctx context.Context) error {
Expand Down Expand Up @@ -138,12 +123,14 @@ func (c *watchingProvider) initWatchingNamespacesByLabels(ctx context.Context) e
}

func (c *watchingProvider) Run(ctx context.Context) {
e := utils.ParallelExecutor{}
if !c.enableLabelsWatching {
return
}

e := utils.ParallelExecutor{}
e.Add(func() {
c.namespaceInformer.Run(ctx.Done())
})

e.Add(func() {
c.controller.run(ctx)
})
Expand All @@ -153,17 +140,30 @@ func (c *watchingProvider) Run(ctx context.Context) {

func (c *watchingProvider) WatchingNamespaces() []string {
var keys []string
c.watchingNamespaces.Range(func(key, _ interface{}) bool {
keys = append(keys, key.(string))
return true
})
if c.enableLabelsWatching {
c.watchingNamespaces.Range(func(key, _ interface{}) bool {
keys = append(keys, key.(string))
return true
})
} else {
namespaces, err := c.kube.Client.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Warnw("Namespace list get failed",
zap.Error(err),
)
return nil
}
for _, ns := range namespaces.Items {
keys = append(keys, ns.Name)
}
}
return keys
}

// IsWatchingNamespace accepts a resource key, getting the namespace part
// and checking whether the namespace is being watched.
func (c *watchingProvider) IsWatchingNamespace(key string) (ok bool) {
if !validation.HasValueInSyncMap(c.watchingNamespaces) {
if !c.enableLabelsWatching {
ok = true
return
}
Expand Down
15 changes: 11 additions & 4 deletions test/e2e/scaffold/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,13 +428,17 @@ func (s *Scaffold) newIngressAPISIXController() error {
})

var ingressAPISIXDeployment string
label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace)
label := `""`
if labels := s.NamespaceSelectorLabelStrings(); labels != nil && !s.opts.DisableNamespaceSelector {
label = labels[0]
}

if s.opts.EnableWebhooks {
ingressAPISIXDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), s.opts.IngressAPISIXReplicas, s.namespace, s.opts.ApisixResourceSyncInterval,
s.FormatNamespaceLabel(label), s.opts.ApisixResourceVersion, s.opts.APISIXPublishAddress, _volumeMounts, _webhookCertSecret)
label, s.opts.ApisixResourceVersion, s.opts.APISIXPublishAddress, _volumeMounts, _webhookCertSecret)
} else {
ingressAPISIXDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), s.opts.IngressAPISIXReplicas, s.namespace, s.opts.ApisixResourceSyncInterval,
s.FormatNamespaceLabel(label), s.opts.ApisixResourceVersion, s.opts.APISIXPublishAddress, "", _webhookCertSecret)
label, s.opts.ApisixResourceVersion, s.opts.APISIXPublishAddress, "", _webhookCertSecret)
}

err = k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, ingressAPISIXDeployment)
Expand Down Expand Up @@ -538,7 +542,10 @@ func (s *Scaffold) GetIngressPodDetails() ([]corev1.Pod, error) {
// ScaleIngressController scales the number of Ingress Controller pods to desired.
func (s *Scaffold) ScaleIngressController(desired int) error {
var ingressDeployment string
label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace)
var label string
if labels := s.NamespaceSelectorLabelStrings(); labels != nil {
label = labels[0]
}
if s.opts.EnableWebhooks {
ingressDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired, s.namespace, s.opts.ApisixResourceSyncInterval, label, s.opts.ApisixResourceVersion, s.opts.APISIXPublishAddress, _volumeMounts, _webhookCertSecret)
} else {
Expand Down
16 changes: 10 additions & 6 deletions test/e2e/scaffold/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,20 @@ func (s *Scaffold) CreateResourceFromStringWithNamespace(yaml, namespace string)
s.kubectlOptions.Namespace = originalNamespace
}()
s.addFinalizers(func() {
originalNamespace := s.kubectlOptions.Namespace
s.kubectlOptions.Namespace = namespace
defer func() {
s.kubectlOptions.Namespace = originalNamespace
}()
assert.Nil(s.t, k8s.KubectlDeleteFromStringE(s.t, s.kubectlOptions, yaml))
_ = s.DeleteResourceFromStringWithNamespace(yaml, namespace)
})
return k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, yaml)
}

func (s *Scaffold) DeleteResourceFromStringWithNamespace(yaml, namespace string) error {
originalNamespace := s.kubectlOptions.Namespace
s.kubectlOptions.Namespace = namespace
defer func() {
s.kubectlOptions.Namespace = originalNamespace
}()
return k8s.KubectlDeleteFromStringE(s.t, s.kubectlOptions, yaml)
}

func (s *Scaffold) ensureNumApisixCRDsCreated(url string, desired int) error {
condFunc := func() (bool, error) {
req, err := http.NewRequest("GET", url, nil)
Expand Down
66 changes: 45 additions & 21 deletions test/e2e/scaffold/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ type Options struct {
APISIXAdminAPIKey string
EnableWebhooks bool
APISIXPublishAddress string
disableNamespaceSelector bool
ApisixResourceSyncInterval string
ApisixResourceVersion string

NamespaceSelectorLabel map[string]string
DisableNamespaceSelector bool
DisableNamespaceLabel bool
}

type Scaffold struct {
Expand Down Expand Up @@ -96,10 +99,10 @@ var (
}

createVersionedApisixResourceMap = map[string]struct{}{
"ApisixRoute": struct{}{},
"ApisixConsumer": struct{}{},
"ApisixPluginConfig": struct{}{},
"ApisixUpstream": struct{}{},
"ApisixRoute": {},
"ApisixConsumer": {},
"ApisixPluginConfig": {},
"ApisixUpstream": {},
}
)

Expand Down Expand Up @@ -368,18 +371,39 @@ func (s *Scaffold) RestartAPISIXDeploy() {
assert.NoError(s.t, err, "renew apisix tunnels")
}

func (s *Scaffold) RestartIngressControllerDeploy() {
pods, err := k8s.ListPodsE(s.t, s.kubectlOptions, metav1.ListOptions{
LabelSelector: "app=ingress-apisix-controller-deployment-e2e-test",
})
assert.NoError(s.t, err, "list ingress-controller pod")
for _, pod := range pods {
err = s.KillPod(pod.Name)
assert.NoError(s.t, err, "killing ingress-controller pod")
}
err = s.WaitAllIngressControllerPodsAvailable()
assert.NoError(s.t, err, "waiting for new ingress-controller instance ready")
}

func (s *Scaffold) beforeEach() {
var err error
s.namespace = fmt.Sprintf("ingress-apisix-e2e-tests-%s-%d", s.opts.Name, time.Now().Nanosecond())
s.kubectlOptions = &k8s.KubectlOptions{
ConfigPath: s.opts.Kubeconfig,
Namespace: s.namespace,
}

s.finializers = nil
labels := make(map[string]string)
labels["apisix.ingress.watch"] = s.namespace
k8s.CreateNamespaceWithMetadata(s.t, s.kubectlOptions, metav1.ObjectMeta{Name: s.namespace, Labels: labels})

label := map[string]string{}
if !s.opts.DisableNamespaceLabel {
if s.opts.NamespaceSelectorLabel == nil {
label["apisix.ingress.watch"] = s.namespace
s.opts.NamespaceSelectorLabel = label
} else {
label = s.opts.NamespaceSelectorLabel
}
}

k8s.CreateNamespaceWithMetadata(s.t, s.kubectlOptions, metav1.ObjectMeta{Name: s.namespace, Labels: label})

s.nodes, err = k8s.GetReadyNodesE(s.t, s.kubectlOptions)
assert.Nil(s.t, err, "querying ready nodes")
Expand Down Expand Up @@ -541,14 +565,6 @@ func (s *Scaffold) FormatRegistry(workloadTemplate string) string {
}
}

// FormatNamespaceLabel set label to be empty if s.opts.disableNamespaceSelector is true.
func (s *Scaffold) FormatNamespaceLabel(label string) string {
if s.opts.disableNamespaceSelector {
return "\"\""
}
return label
}

var (
versionRegex = regexp.MustCompile(`apiVersion: apisix.apache.org/v.*?\n`)
kindRegex = regexp.MustCompile(`kind: (.*?)\n`)
Expand All @@ -566,10 +582,6 @@ func (s *Scaffold) getKindValue(yml string) string {
return subStr[1]
}

func (s *Scaffold) DisableNamespaceSelector() {
s.opts.disableNamespaceSelector = true
}

func waitExponentialBackoff(condFunc func() (bool, error)) error {
backoff := wait.Backoff{
Duration: 500 * time.Millisecond,
Expand Down Expand Up @@ -614,3 +626,15 @@ func (s *Scaffold) CreateVersionedApisixResourceWithNamespace(yml, namespace str
func ApisixResourceVersion() *apisixResourceVersionInfo {
return apisixResourceVersion
}

func (s *Scaffold) NamespaceSelectorLabelStrings() []string {
var labels []string
for k, v := range s.opts.NamespaceSelectorLabel {
labels = append(labels, fmt.Sprintf("%s=%s", k, v))
}
return labels
}

func (s *Scaffold) NamespaceSelectorLabel() map[string]string {
return s.opts.NamespaceSelectorLabel
}
Loading