Skip to content

Commit

Permalink
feat: add full compare when ingress startup (#680)
Browse files Browse the repository at this point in the history
  • Loading branch information
gxthrj committed Sep 24, 2021
1 parent 1b71fa3 commit 957c315
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 5 deletions.
246 changes: 246 additions & 0 deletions pkg/ingress/compare.go
@@ -0,0 +1,246 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingress

import (
"context"
"sync"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/apache/apisix-ingress-controller/pkg/log"
)

// CompareResources used to compare the object IDs in resources and APISIX
// Find out the rest of objects in APISIX
// AND warn them in log.
func (c *Controller) CompareResources(ctx context.Context) error {
var (
wg sync.WaitGroup
routeMapK8S = new(sync.Map)
streamRouteMapK8S = new(sync.Map)
upstreamMapK8S = new(sync.Map)
sslMapK8S = new(sync.Map)
consumerMapK8S = new(sync.Map)

routeMapA6 = make(map[string]string)
streamRouteMapA6 = make(map[string]string)
upstreamMapA6 = make(map[string]string)
sslMapA6 = make(map[string]string)
consumerMapA6 = make(map[string]string)
)
// watchingNamespace == nil means to monitor all namespaces
if c.watchingNamespace == nil {
opts := v1.ListOptions{}
// list all namespaces
nsList, err := c.kubeClient.Client.CoreV1().Namespaces().List(ctx, opts)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
wns := make(map[string]struct{}, len(nsList.Items))
for _, v := range nsList.Items {
wns[v.Name] = struct{}{}
}
c.watchingNamespace = wns
}
}
if len(c.watchingNamespace) > 0 {
wg.Add(len(c.watchingNamespace))
}
for ns := range c.watchingNamespace {
go func(ns string) {
defer wg.Done()
// ApisixRoute
opts := v1.ListOptions{}
retRoutes, err := c.kubeClient.APISIXClient.ApisixV2beta1().ApisixRoutes(ns).List(ctx, opts)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, r := range retRoutes.Items {
tc, err := c.translator.TranslateRouteV2beta1NotStrictly(&r)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
// routes
for _, route := range tc.Routes {
routeMapK8S.Store(route.ID, route.ID)
}
// streamRoutes
for _, stRoute := range tc.StreamRoutes {
streamRouteMapK8S.Store(stRoute.ID, stRoute.ID)
}
// upstreams
for _, upstream := range tc.Upstreams {
upstreamMapK8S.Store(upstream.ID, upstream.ID)
}
// ssl
for _, ssl := range tc.SSL {
sslMapK8S.Store(ssl.ID, ssl.ID)
}
}
}
}
// todo ApisixUpstream
// ApisixUpstream should be synced with ApisixRoute resource

// ApisixSSL
retSSL, err := c.kubeClient.APISIXClient.ApisixV1().ApisixTlses(ns).List(ctx, opts)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, s := range retSSL.Items {
ssl, err := c.translator.TranslateSSL(&s)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
sslMapK8S.Store(ssl.ID, ssl.ID)
}
}
}
// ApisixConsumer
retConsumer, err := c.kubeClient.APISIXClient.ApisixV2alpha1().ApisixConsumers(ns).List(ctx, opts)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
for _, con := range retConsumer.Items {
consumer, err := c.translator.TranslateApisixConsumer(&con)
if err != nil {
log.Error(err.Error())
ctx.Done()
} else {
consumerMapK8S.Store(consumer.Username, consumer.Username)
}
}
}
}(ns)
}
wg.Wait()

// 2.get all cache routes
if err := c.listRouteCache(ctx, routeMapA6); err != nil {
return err
}
if err := c.listStreamRouteCache(ctx, streamRouteMapA6); err != nil {
return err
}
if err := c.listUpstreamCache(ctx, upstreamMapA6); err != nil {
return err
}
if err := c.listSSLCache(ctx, sslMapA6); err != nil {
return err
}
if err := c.listConsumerCache(ctx, consumerMapA6); err != nil {
return err
}
// 3.compare
routeReult := findRedundant(routeMapA6, routeMapK8S)
streamRouteReult := findRedundant(streamRouteMapA6, streamRouteMapK8S)
upstreamReult := findRedundant(upstreamMapA6, upstreamMapK8S)
sslReult := findRedundant(sslMapA6, sslMapK8S)
consuemrReult := findRedundant(consumerMapA6, consumerMapK8S)
// 4.warn
warnRedundantResources(routeReult, "route")
warnRedundantResources(streamRouteReult, "streamRoute")
warnRedundantResources(upstreamReult, "upstream")
warnRedundantResources(sslReult, "ssl")
warnRedundantResources(consuemrReult, "consumer")

return nil
}

// log warn
func warnRedundantResources(resources map[string]string, t string) {
for k := range resources {
log.Warnf("%s: %s in APISIX but do not in declare yaml", t, k)
}
}

// findRedundant find redundant item which in src and do not in dest
func findRedundant(src map[string]string, dest *sync.Map) map[string]string {
result := make(map[string]string)
for k, v := range src {
_, ok := dest.Load(k)
if !ok {
result[k] = v
}
}
return result
}

func (c *Controller) listRouteCache(ctx context.Context, routeMapA6 map[string]string) error {
routesInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Route().List(ctx)
if err != nil {
return err
} else {
for _, ra := range routesInA6 {
routeMapA6[ra.ID] = ra.ID
}
}
return nil
}

func (c *Controller) listStreamRouteCache(ctx context.Context, streamRouteMapA6 map[string]string) error {
streamRoutesInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).StreamRoute().List(ctx)
if err != nil {
return err
} else {
for _, ra := range streamRoutesInA6 {
streamRouteMapA6[ra.ID] = ra.ID
}
}
return nil
}

func (c *Controller) listUpstreamCache(ctx context.Context, upstreamMapA6 map[string]string) error {
upstreamsInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Upstream().List(ctx)
if err != nil {
return err
} else {
for _, ra := range upstreamsInA6 {
upstreamMapA6[ra.ID] = ra.ID
}
}
return nil
}

func (c *Controller) listSSLCache(ctx context.Context, sslMapA6 map[string]string) error {
sslInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).SSL().List(ctx)
if err != nil {
return err
} else {
for _, s := range sslInA6 {
sslMapA6[s.ID] = s.ID
}
}
return nil
}

func (c *Controller) listConsumerCache(ctx context.Context, consumerMapA6 map[string]string) error {
consumerInA6, err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Consumer().List(ctx)
if err != nil {
return err
} else {
for _, con := range consumerInA6 {
consumerMapA6[con.Username] = con.Username
}
}
return nil
}
7 changes: 7 additions & 0 deletions pkg/ingress/controller.go
Expand Up @@ -402,6 +402,12 @@ func (c *Controller) run(ctx context.Context) {

c.initWhenStartLeading()

// compare resources of k8s with objects of APISIX
if err = c.CompareResources(ctx); err != nil {
ctx.Done()
return
}

c.goAttach(func() {
c.checkClusterHealth(ctx, cancelFunc)
})
Expand All @@ -418,6 +424,7 @@ func (c *Controller) run(ctx context.Context) {
c.ingressInformer.Run(ctx.Done())
})
c.goAttach(func() {

c.apisixRouteInformer.Run(ctx.Done())
})
c.goAttach(func() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingress/pod.go
Expand Up @@ -89,7 +89,8 @@ func (c *podController) onUpdate(_, cur interface{}) {
return
}
log.Debugw("pod update event arrived",
zap.Any("final state", pod),
zap.Any("pod namespace", pod.Namespace),
zap.Any("pod name", pod.Name),
)
if pod.DeletionTimestamp != nil {
if err := c.controller.podCache.Delete(pod); err != nil {
Expand Down
1 change: 0 additions & 1 deletion test/e2e/go.mod
Expand Up @@ -8,7 +8,6 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/gruntwork-io/terratest v0.32.8
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.10.1
github.com/stretchr/testify v1.7.0
k8s.io/api v0.21.1
k8s.io/apimachinery v0.21.1
Expand Down
74 changes: 74 additions & 0 deletions test/e2e/ingress/compare.go
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingress

import (
"fmt"
"time"

"github.com/onsi/ginkgo"
"github.com/stretchr/testify/assert"

"github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
)

var _ = ginkgo.Describe("Testing compare resources", func() {
opts := &scaffold.Options{
Name: "default",
Kubeconfig: scaffold.GetKubeconfig(),
APISIXConfigPath: "testdata/apisix-gw-config.yaml",
IngressAPISIXReplicas: 1,
HTTPBinServicePort: 80,
APISIXRouteVersion: "apisix.apache.org/v2beta1",
}
s := scaffold.NewScaffold(opts)
ginkgo.It("Compare and find out the redundant objects in APISIX, and remove them", func() {
backendSvc, backendSvcPort := s.DefaultHTTPBackend()
apisixRoute := fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta1
kind: ApisixRoute
metadata:
name: httpbin-route
spec:
http:
- name: rule1
match:
hosts:
- httpbin.com
paths:
- /ip
backend:
serviceName: %s
servicePort: %d
`, backendSvc, backendSvcPort[0])
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(apisixRoute))

err := s.EnsureNumApisixRoutesCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
err = s.EnsureNumApisixUpstreamsCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of upstreams")
// scale Ingres Controller --replicas=0
assert.Nil(ginkgo.GinkgoT(), s.ScaleIngressController(0), "scaling ingress controller instances = 0")
// remove ApisixRoute resource
assert.Nil(ginkgo.GinkgoT(), s.RemoveResourceByString(apisixRoute))
// scale Ingres Controller --replicas=1
assert.Nil(ginkgo.GinkgoT(), s.ScaleIngressController(1), "scaling ingress controller instances = 1")
time.Sleep(15 * time.Second)
// should find the warn log
output := s.GetDeploymentLogs("ingress-apisix-controller-deployment-e2e-test")
fmt.Println(output)
assert.Contains(ginkgo.GinkgoT(), output, "in APISIX but do not in declare yaml")
})
})
18 changes: 18 additions & 0 deletions test/e2e/scaffold/ingress.go
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/base64"
"fmt"
"time"

"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/onsi/ginkgo"
Expand Down Expand Up @@ -452,3 +453,20 @@ func (s *Scaffold) GetIngressPodDetails() ([]v1.Pod, error) {
LabelSelector: "app=ingress-apisix-controller-deployment-e2e-test",
})
}

// ScaleIngressController scales the number of Ingress Controller pods to desired.
func (s *Scaffold) ScaleIngressController(desired int) error {
var ingressDeployment string
if s.opts.EnableWebhooks {
ingressDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, s.namespace, s.opts.APISIXRouteVersion, _volumeMounts, _webhookCertSecret)
} else {
ingressDeployment = fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, s.namespace, s.opts.APISIXRouteVersion, "", _webhookCertSecret)
}
if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, ingressDeployment); err != nil {
return err
}
if err := k8s.WaitUntilNumPodsCreatedE(s.t, s.kubectlOptions, s.labelSelector("app=ingress-apisix-controller-deployment-e2e-test"), desired, 5, 5*time.Second); err != nil {
return err
}
return nil
}

0 comments on commit 957c315

Please sign in to comment.