Skip to content

Commit

Permalink
fix: watch all namespaces by default (#919)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmssczy committed Mar 23, 2022
1 parent 2178857 commit 0a66151
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 37 deletions.
5 changes: 3 additions & 2 deletions pkg/ingress/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (c *Controller) CompareResources(ctx context.Context) error {
consumerMapA6 = make(map[string]string)
pluginConfigMapA6 = make(map[string]string)
)
// watchingNamespaces == nil means to monitor all namespaces
if !validation.HasValueInSyncMap(c.watchingNamespaces) {
// watchingNamespaces and watchingLabels are empty means to monitor all namespaces.
if !validation.HasValueInSyncMap(c.watchingNamespaces) && len(c.watchingLabels) == 0 {
opts := v1.ListOptions{}
// list all namespaces
nsList, err := c.kubeClient.Client.CoreV1().Namespaces().List(ctx, opts)
Expand All @@ -64,6 +64,7 @@ func (c *Controller) CompareResources(ctx context.Context) error {
}

c.watchingNamespaces.Range(func(key, value interface{}) bool {
log.Debugf("start to watch namespace: %s", key)
wg.Add(1)
go func(ns string) {
defer wg.Done()
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingress/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (c *Controller) initWatchingNamespacesByLabels(ctx context.Context) error {
c.watchingNamespaces.Store(ns.Name, struct{}{})
}
log.Infow("label selector watching namespaces", zap.Strings("namespaces", nss))

return nil
}

Expand Down Expand Up @@ -142,8 +141,9 @@ func (c *namespaceController) handleSyncErr(event *types.Event, err error) {

func (c *namespaceController) onAdd(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
log.Debugw(key)
if err != nil {
log.Errorf("found Namespace resource with error: %v", err)
return
}
c.workqueue.Add(&types.Event{
Type: types.EventAdd,
Expand All @@ -159,7 +159,7 @@ func (c *namespaceController) onUpdate(pre, cur interface{}) {
}
key, err := cache.MetaNamespaceKeyFunc(cur)
if err != nil {
log.Errorf("found Namespace resource with error: %s", err)
log.Errorf("found Namespace resource with error: %v", err)
return
}
c.workqueue.Add(&types.Event{
Expand Down
130 changes: 115 additions & 15 deletions test/e2e/ingress/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,21 @@ import (
"net/http"
"time"

"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/onsi/ginkgo"
"github.com/stretchr/testify/assert"

"github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
)

type headers struct {
Headers struct {
Accept string `json:"Accept"`
Host string `json:"Host"`
UserAgent string `json:"User-Agent"`
} `json:"headers"`
}

var _ = ginkgo.Describe("namespacing filtering", func() {
opts := &scaffold.Options{
Name: "default",
Expand All @@ -37,9 +46,10 @@ var _ = ginkgo.Describe("namespacing filtering", func() {
APISIXRouteVersion: "apisix.apache.org/v2beta3",
}
s := scaffold.NewScaffold(opts)
ginkgo.It("resources in other namespaces should be ignored", func() {
backendSvc, backendSvcPort := s.DefaultHTTPBackend()
route := fmt.Sprintf(`
ginkgo.Context("with namespace_selector", func() {
ginkgo.It("resources in other namespaces should be ignored", func() {
backendSvc, backendSvcPort := s.DefaultHTTPBackend()
route := fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta3
kind: ApisixRoute
metadata:
Expand All @@ -57,18 +67,86 @@ spec:
servicePort: %d
`, backendSvc, backendSvcPort[0])

assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(route), "creating ApisixRoute")
time.Sleep(6 * time.Second)
assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1), "checking number of routes")
assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(route), "creating ApisixRoute")
time.Sleep(6 * time.Second)
// assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1), "checking number of routes")
// assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")

body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
var placeholder ip
err := json.Unmarshal([]byte(body), &placeholder)
assert.Nil(ginkgo.GinkgoT(), err, "unmarshalling IP")

// Now create another ApisixRoute in default namespace.
route = fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta3
kind: ApisixRoute
metadata:
name: httpbin-route
spec:
http:
- name: rule1
match:
hosts:
- httpbin.com
paths:
- /headers
backends:
- serviceName: %s
servicePort: %d
`, backendSvc, backendSvcPort[0])

assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromStringWithNamespace(route, "default"), "creating ApisixRoute")
_ = s.NewAPISIXClient().GET("/headers").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusNotFound)
})
})

ginkgo.Context("without namespace_selector", func() {
// make namespace_selector empty
s.DisableNamespaceSelector()
namespace := "second-httpbin-service-namespace"

// create another http-bin service in a new namespace.
ginkgo.BeforeEach(func() {
k8s.CreateNamespace(ginkgo.GinkgoT(), &k8s.KubectlOptions{
ConfigPath: scaffold.GetKubeconfig(),
}, namespace)
_, err := s.NewHTTPBINWithNamespace(namespace)
assert.Nil(ginkgo.GinkgoT(), err, "create second httpbin service")
})

body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
var placeholder ip
err := json.Unmarshal([]byte(body), &placeholder)
assert.Nil(ginkgo.GinkgoT(), err, "unmarshalling IP")
// clean this tmp namespace when test case is done.
ginkgo.AfterEach(func() {
err := k8s.DeleteNamespaceE(ginkgo.GinkgoT(), &k8s.KubectlOptions{
ConfigPath: scaffold.GetKubeconfig()}, namespace)
assert.Nilf(ginkgo.GinkgoT(), err, "deleting namespace %s", namespace)
})

// Now create another ApisixRoute in default namespace.
route = fmt.Sprintf(`
ginkgo.It("all resources will be watched", func() {
backendSvc, backendSvcPort := s.DefaultHTTPBackend()
route := fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta3
kind: ApisixRoute
metadata:
name: httpbin-route
spec:
http:
- name: rule1
match:
hosts:
- httpbin.com
paths:
- /ip
backends:
- serviceName: %s
servicePort: %d
`, backendSvc, backendSvcPort[0])
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(route), "creating first ApisixRoute")
time.Sleep(3 * time.Second)

// Now create another ApisixRoute in another namespace.
backendSvc, backendSvcPort = s.DefaultHTTPBackend()
route = fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta3
kind: ApisixRoute
metadata:
Expand All @@ -86,7 +164,29 @@ spec:
servicePort: %d
`, backendSvc, backendSvcPort[0])

assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromStringWithNamespace(route, "default"), "creating ApisixRoute")
_ = s.NewAPISIXClient().GET("/headers").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusNotFound)
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromStringWithNamespace(route, namespace), "creating second ApisixRoute")

// restart ingress-controller
pods, err := s.GetIngressPodDetails()
assert.Nil(ginkgo.GinkgoT(), err)
assert.Len(ginkgo.GinkgoT(), pods, 1)
ginkgo.GinkgoT().Logf("restart apisix-ingress-controller pod %s", pods[0].Name)
assert.Nil(ginkgo.GinkgoT(), s.KillPod(pods[0].Name))
time.Sleep(6 * time.Second)
// Two ApisixRoutes have been created at this time.
// assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(2), "checking number of routes")
// assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixUpstreamsCreated(2), "checking number of upstreams")

body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
var placeholder ip
err = json.Unmarshal([]byte(body), &placeholder)
assert.Nil(ginkgo.GinkgoT(), err, "unmarshalling IP")
assert.NotEqual(ginkgo.GinkgoT(), ip{}, placeholder)
body = s.NewAPISIXClient().GET("/headers").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
var headerResponse headers
err = json.Unmarshal([]byte(body), &headerResponse)
assert.Nil(ginkgo.GinkgoT(), err, "unmarshalling header")
assert.NotEqual(ginkgo.GinkgoT(), headers{}, headerResponse)
})
})
})
5 changes: 4 additions & 1 deletion test/e2e/ingress/sanity.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

type ip struct {
IP string `json:"ip"`
IP string `json:"origin"`
}

var _ = ginkgo.Describe("single-route", func() {
Expand Down Expand Up @@ -71,6 +71,7 @@ spec:
var placeholder ip
err = json.Unmarshal([]byte(body), &placeholder)
assert.Nil(ginkgo.GinkgoT(), err, "unmarshalling IP")
assert.NotEqual(ginkgo.GinkgoT(), ip{}, placeholder)
// It's not our focus point to check the IP address returned by httpbin,
// so here skip the IP address validation.
})
Expand Down Expand Up @@ -124,6 +125,7 @@ spec:
var placeholder ip
err = json.Unmarshal([]byte(body), &placeholder)
assert.Nil(ginkgo.GinkgoT(), err, "unmarshalling IP")
assert.NotEqual(ginkgo.GinkgoT(), ip{}, placeholder)

body = s.NewAPISIXClient().GET("/json").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
var dummy map[string]interface{}
Expand Down Expand Up @@ -228,6 +230,7 @@ spec:
var placeholder ip
err = json.Unmarshal([]byte(body), &placeholder)
assert.Nil(ginkgo.GinkgoT(), err, "unmarshalling IP")
assert.NotEqual(ginkgo.GinkgoT(), ip{}, placeholder)
// It's not our focus point to check the IP address returned by httpbin,
// so here skip the IP address validation.
})
Expand Down
9 changes: 9 additions & 0 deletions test/e2e/scaffold/httpbin.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ func (s *Scaffold) newHTTPBIN() (*corev1.Service, error) {
return svc, nil
}

func (s *Scaffold) NewHTTPBINWithNamespace(namespace string) (*corev1.Service, error) {
originalNamespace := s.kubectlOptions.Namespace
s.kubectlOptions.Namespace = namespace
defer func() {
s.kubectlOptions.Namespace = originalNamespace
}()
return s.newHTTPBIN()
}

// ScaleHTTPBIN scales the number of HTTPBIN pods to desired.
func (s *Scaffold) ScaleHTTPBIN(desired int) error {
httpbinDeployment := fmt.Sprintf(s.FormatRegistry(_httpbinDeploymentTemplate), desired)
Expand Down
9 changes: 5 additions & 4 deletions test/e2e/scaffold/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/stretchr/testify/assert"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -407,9 +406,11 @@ func (s *Scaffold) newIngressAPISIXController() error {
var ingressAPISIXDeployment string
label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace)
if s.opts.EnableWebhooks {
ingressAPISIXDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), s.opts.IngressAPISIXReplicas, s.namespace, label, s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, _volumeMounts, _webhookCertSecret)
ingressAPISIXDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), s.opts.IngressAPISIXReplicas, s.namespace,
s.FormatNamespaceLabel(label), s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, _volumeMounts, _webhookCertSecret)
} else {
ingressAPISIXDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), s.opts.IngressAPISIXReplicas, s.namespace, label, s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, "", _webhookCertSecret)
ingressAPISIXDeployment = fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), s.opts.IngressAPISIXReplicas, s.namespace,
s.FormatNamespaceLabel(label), s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, "", _webhookCertSecret)
}

err = k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, ingressAPISIXDeployment)
Expand Down Expand Up @@ -504,7 +505,7 @@ func (s *Scaffold) WaitGetLeaderLease() (*coordinationv1.Lease, error) {

// GetIngressPodDetails returns a batch of pod description
// about apisix-ingress-controller.
func (s *Scaffold) GetIngressPodDetails() ([]v1.Pod, error) {
func (s *Scaffold) GetIngressPodDetails() ([]corev1.Pod, error) {
return k8s.ListPodsE(s.t, s.kubectlOptions, metav1.ListOptions{
LabelSelector: "app=ingress-apisix-controller-deployment-e2e-test",
})
Expand Down
33 changes: 22 additions & 11 deletions test/e2e/scaffold/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ import (
"github.com/gruntwork-io/terratest/modules/testing"
"github.com/onsi/ginkgo"
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)

type Options struct {
Name string
Kubeconfig string
APISIXConfigPath string
IngressAPISIXReplicas int
HTTPBinServicePort int
APISIXRouteVersion string
APISIXAdminAPIKey string
EnableWebhooks bool
APISIXPublishAddress string
Name string
Kubeconfig string
APISIXConfigPath string
IngressAPISIXReplicas int
HTTPBinServicePort int
APISIXRouteVersion string
APISIXAdminAPIKey string
EnableWebhooks bool
APISIXPublishAddress string
disableNamespaceSelector bool
}

type Scaffold struct {
Expand All @@ -64,7 +64,6 @@ type Scaffold struct {
nodes []corev1.Node
etcdService *corev1.Service
apisixService *corev1.Service
httpbinDeployment *appsv1.Deployment
httpbinService *corev1.Service
testBackendService *corev1.Service
finializers []func()
Expand Down Expand Up @@ -471,6 +470,18 @@ func (s *Scaffold) FormatRegistry(workloadTemplate string) string {
}
}

// FormatNamespaceLabel set label to be empty if s.opts.disableNamespaceSelector is true.
func (s *Scaffold) FormatNamespaceLabel(label string) string {
if s.opts.disableNamespaceSelector {
return "\"\""
}
return label
}

func (s *Scaffold) DisableNamespaceSelector() {
s.opts.disableNamespaceSelector = true
}

func waitExponentialBackoff(condFunc func() (bool, error)) error {
backoff := wait.Backoff{
Duration: 500 * time.Millisecond,
Expand Down

0 comments on commit 0a66151

Please sign in to comment.