Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add full compare when ingress startup #680

Merged
merged 15 commits into from Sep 24, 2021
1 change: 1 addition & 0 deletions pkg/apisix/schema.go
Expand Up @@ -17,6 +17,7 @@ package apisix

import (
"context"

"go.uber.org/zap"

"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
Expand Down
246 changes: 246 additions & 0 deletions pkg/ingress/compare.go
@@ -0,0 +1,246 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingress

import (
"context"
"sync"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/apisix-ingress-controller/pkg/log"
)

// CompareResources used to compare the object IDs in resources and APISIX
// Find out the rest of objects in APISIX
// AND warn them in log.
func (c *Controller) CompareResources(ctx context.Context) error {
var (
wg sync.WaitGroup
routeMapK8S = new(sync.Map)
streamRouteMapK8S = new(sync.Map)
upstreamMapK8S = new(sync.Map)
sslMapK8S = new(sync.Map)
consumerMapK8S = new(sync.Map)

routeMapA6 = make(map[string]string)
streamRouteMapA6 = make(map[string]string)
upstreamMapA6 = make(map[string]string)
sslMapA6 = make(map[string]string)
consumerMapA6 = make(map[string]string)
)
// watchingNamespace == nil means to monitor all namespaces
if c.watchingNamespace == nil {
opts := v1.ListOptions{}
// list all namespaces
nsList, err := c.kubeClient.Client.CoreV1().Namespaces().List(ctx, opts)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
wns := make(map[string]struct{}, len(nsList.Items))
for _, v := range nsList.Items {
wns[v.Name] = struct{}{}
}
c.watchingNamespace = wns
}
}
if len(c.watchingNamespace) > 0 {
wg.Add(len(c.watchingNamespace))
}
for ns := range c.watchingNamespace {
go func(ns string) {
defer wg.Done()
// ApisixRoute
opts := v1.ListOptions{}
retRoutes, err := c.kubeClient.APISIXClient.ApisixV2beta1().ApisixRoutes(ns).List(ctx, opts)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, r := range retRoutes.Items {
tc, err := c.translator.TranslateRouteV2beta1NotStrictly(&r)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
// routes
for _, route := range tc.Routes {
routeMapK8S.Store(route.ID, route.ID)
}
// streamRoutes
for _, stRoute := range tc.StreamRoutes {
streamRouteMapK8S.Store(stRoute.ID, stRoute.ID)
}
// upstreams
for _, upstream := range tc.Upstreams {
upstreamMapK8S.Store(upstream.ID, upstream.ID)
}
// ssl
for _, ssl := range tc.SSL {
sslMapK8S.Store(ssl.ID, ssl.ID)
}
}
}
}
// todo ApisixUpstream
// ApisixUpstream should be synced with ApisixRoute resource

// ApisixSSL
retSSL, err := c.kubeClient.APISIXClient.ApisixV1().ApisixTlses(ns).List(ctx, opts)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, s := range retSSL.Items {
ssl, err := c.translator.TranslateSSL(&s)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
sslMapK8S.Store(ssl.ID, ssl.ID)
}
}
}
// ApisixConsumer
retConsumer, err := c.kubeClient.APISIXClient.ApisixV2alpha1().ApisixConsumers(ns).List(ctx, opts)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, con := range retConsumer.Items {
consumer, err := c.translator.TranslateApisixConsumer(&con)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
consumerMapK8S.Store(consumer.Username, consumer.Username)
}
}
}
}(ns)
}
wg.Wait()

// 2.get all cache routes
if err := c.listRouteCache(ctx, routeMapA6); err != nil {
return err
}
if err := c.listStreamRouteCache(ctx, streamRouteMapA6); err != nil {
return err
}
if err := c.listUpstreamCache(ctx, upstreamMapA6); err != nil {
return err
}
if err := c.listSSLCache(ctx, sslMapA6); err != nil {
return err
}
if err := c.listConsumerCache(ctx, consumerMapA6); err != nil {
return err
}
// 3.compare
routeReult := findRedundant(routeMapA6, routeMapK8S)
streamRouteReult := findRedundant(streamRouteMapA6, streamRouteMapK8S)
upstreamReult := findRedundant(upstreamMapA6, upstreamMapK8S)
sslReult := findRedundant(sslMapA6, sslMapK8S)
consuemrReult := findRedundant(consumerMapA6, consumerMapK8S)
// 4.warn
warnRedundantResources(routeReult, "route")
warnRedundantResources(streamRouteReult, "streamRoute")
warnRedundantResources(upstreamReult, "upstream")
warnRedundantResources(sslReult, "ssl")
warnRedundantResources(consuemrReult, "consumer")

return nil
}

// log warn
func warnRedundantResources(resources map[string]string, t string) {
for k := range resources {
log.Warnf("%s: %s in APISIX but do not in declare yaml", t, k)
}
}

// findRedundant find redundant item which in src and do not in dest
func findRedundant(src map[string]string, dest *sync.Map) map[string]string {
result := make(map[string]string)
for k, v := range src {
_, ok := dest.Load(k)
if !ok {
result[k] = v
}
}
return result
}

func (c *Controller) listRouteCache(ctx context.Context, routeMapA6 map[string]string) error {
routesInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Route().List(ctx)
if err != nil {
return err
} else {
for _, ra := range routesInA6 {
routeMapA6[ra.ID] = ra.ID
}
}
return nil
}

func (c *Controller) listStreamRouteCache(ctx context.Context, streamRouteMapA6 map[string]string) error {
streamRoutesInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).StreamRoute().List(ctx)
if err != nil {
return err
} else {
for _, ra := range streamRoutesInA6 {
streamRouteMapA6[ra.ID] = ra.ID
}
}
return nil
}

func (c *Controller) listUpstreamCache(ctx context.Context, upstreamMapA6 map[string]string) error {
upstreamsInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Upstream().List(ctx)
if err != nil {
return err
} else {
for _, ra := range upstreamsInA6 {
upstreamMapA6[ra.ID] = ra.ID
}
}
return nil
}

func (c *Controller) listSSLCache(ctx context.Context, sslMapA6 map[string]string) error {
sslInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).SSL().List(ctx)
if err != nil {
return err
} else {
for _, s := range sslInA6 {
sslMapA6[s.ID] = s.ID
}
}
return nil
}

func (c *Controller) listConsumerCache(ctx context.Context, consumerMapA6 map[string]string) error {
consumerInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Consumer().List(ctx)
if err != nil {
return err
} else {
for _, con := range consumerInA6 {
consumerMapA6[con.Username] = con.Username
}
}
return nil
}
7 changes: 7 additions & 0 deletions pkg/ingress/controller.go
Expand Up @@ -400,6 +400,12 @@ func (c *Controller) run(ctx context.Context) {

c.initWhenStartLeading()

// compare resources of k8s with objects of APISIX
if err = c.CompareResources(ctx); err != nil {
ctx.Done()
return
}

c.goAttach(func() {
c.checkClusterHealth(ctx, cancelFunc)
})
Expand All @@ -416,6 +422,7 @@ func (c *Controller) run(ctx context.Context) {
c.ingressInformer.Run(ctx.Done())
})
c.goAttach(func() {

c.apisixRouteInformer.Run(ctx.Done())
})
c.goAttach(func() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/pod.go
Expand Up @@ -89,7 +89,8 @@ func (c *podController) onUpdate(_, cur interface{}) {
return
}
log.Debugw("pod update event arrived",
zap.Any("final state", pod),
zap.Any("pod namespace", pod.Namespace),
zap.Any("pod name", pod.Name),
)
if pod.DeletionTimestamp != nil {
if err := c.controller.podCache.Delete(pod); err != nil {
Expand Down
1 change: 0 additions & 1 deletion test/e2e/go.mod
Expand Up @@ -8,7 +8,6 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/gruntwork-io/terratest v0.32.8
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.10.1
github.com/stretchr/testify v1.7.0
k8s.io/api v0.21.1
k8s.io/apimachinery v0.21.1
Expand Down
74 changes: 74 additions & 0 deletions test/e2e/ingress/compare.go
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingress

import (
"fmt"
"time"

"github.com/onsi/ginkgo"
"github.com/stretchr/testify/assert"

"github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
)

var _ = ginkgo.Describe("Testing compare resources", func() {
opts := &scaffold.Options{
Name: "default",
Kubeconfig: scaffold.GetKubeconfig(),
APISIXConfigPath: "testdata/apisix-gw-config.yaml",
IngressAPISIXReplicas: 1,
HTTPBinServicePort: 80,
APISIXRouteVersion: "apisix.apache.org/v2beta1",
}
s := scaffold.NewScaffold(opts)
ginkgo.It("Compare and find out the redundant objects in APISIX, and remove them", func() {
backendSvc, backendSvcPort := s.DefaultHTTPBackend()
apisixRoute := fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta1
kind: ApisixRoute
metadata:
name: httpbin-route
spec:
http:
- name: rule1
match:
hosts:
- httpbin.com
paths:
- /ip
backend:
serviceName: %s
servicePort: %d
`, backendSvc, backendSvcPort[0])
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(apisixRoute))

err := s.EnsureNumApisixRoutesCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
err = s.EnsureNumApisixUpstreamsCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of upstreams")
// scale Ingres Controller --replicas=0
assert.Nil(ginkgo.GinkgoT(), s.ScaleIngressController(0), "scaling ingress controller instances = 0")
// remove ApisixRoute resource
assert.Nil(ginkgo.GinkgoT(), s.RemoveResourceByString(apisixRoute))
// scale Ingres Controller --replicas=1
assert.Nil(ginkgo.GinkgoT(), s.ScaleIngressController(1), "scaling ingress controller instances = 1")
time.Sleep(15 * time.Second)
// should find the warn log
output := s.GetDeploymentLogs("ingress-apisix-controller-deployment-e2e-test")
fmt.Println(output)
assert.Contains(ginkgo.GinkgoT(), output, "in APISIX but do not in declare yaml")
})
})
18 changes: 18 additions & 0 deletions test/e2e/scaffold/ingress.go
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/base64"
"fmt"
"time"

"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/onsi/ginkgo"
Expand Down Expand Up @@ -452,3 +453,20 @@ func (s *Scaffold) GetIngressPodDetails() ([]v1.Pod, error) {
LabelSelector: "app=ingress-apisix-controller-deployment-e2e-test",
})
}

// ScaleIngressController scales the number of Ingress Controller pods to desired.
func (s *Scaffold) ScaleIngressController(desired int) error {
var ingressDeployment string
if s.opts.EnableWebhooks {
ingressDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, s.namespace, s.opts.APISIXRouteVersion, _volumeMounts, _webhookCertSecret)
} else {
ingressDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, s.namespace, s.opts.APISIXRouteVersion, "", _webhookCertSecret)
}
if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, ingressDeployment); err != nil {
return err
}
if err := k8s.WaitUntilNumPodsCreatedE(s.t, s.kubectlOptions, s.labelSelector("app=ingress-apisix-controller-deployment-e2e-test"), desired, 5, 5*time.Second); err != nil {
return err
}
return nil
}