Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: delete the cluster object when give up the leadership #774

Merged
merged 6 commits into from
Dec 24, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/apisix/apisix.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type APISIX interface {
UpdateCluster(context.Context, *ClusterOptions) error
// ListClusters lists all APISIX clusters.
ListClusters() []Cluster
// DeleteCluster deletes the target APISIX cluster by its name.
DeleteCluster(string)
}

// Cluster defines specific operations that can be applied in an APISIX
Expand Down Expand Up @@ -203,3 +205,12 @@ func (c *apisix) UpdateCluster(ctx context.Context, co *ClusterOptions) error {
c.clusters[co.Name] = cluster
return nil
}

func (c *apisix) DeleteCluster(name string) {
c.mu.Lock()
defer c.mu.Unlock()

// Don't have to close or free some resources in that cluster, so
// just delete its index.
delete(c.clusters, name)
}
12 changes: 9 additions & 3 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,16 @@ func (c *cluster) syncCache(ctx context.Context) {
Steps: 5,
}
var lastSyncErr error
err := wait.ExponentialBackoff(backoff, func() (done bool, _ error) {
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
// impossibly return: false, nil
// so can safe used
done, lastSyncErr = c.syncCacheOnce(ctx)
select {
case <-ctx.Done():
err = context.Canceled
default:
break
}
return
})
if err != nil {
Expand All @@ -197,7 +203,7 @@ func (c *cluster) syncCache(ctx context.Context) {
func (c *cluster) syncCacheOnce(ctx context.Context) (bool, error) {
routes, err := c.route.List(ctx)
if err != nil {
log.Errorf("failed to list route in APISIX: %s", err)
log.Errorf("failed to list routes in APISIX: %s", err)
return false, err
}
upstreams, err := c.upstream.List(ctx)
Expand Down Expand Up @@ -327,7 +333,7 @@ func (c *cluster) syncSchema(ctx context.Context, interval time.Duration) {

for {
if err := c.syncSchemaOnce(ctx); err != nil {
log.Warnf("failed to sync schema: %s", err)
log.Errorf("failed to sync schema: %s", err)
c.metricsCollector.IncrSyncOperation("schema", "failure")
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/apisix/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"strings"
"testing"

"github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/stretchr/testify/assert"

"golang.org/x/net/nettest"

"github.com/apache/apisix-ingress-controller/pkg/metrics"
)

type fakeAPISIXPluginSrv struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/apisix/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"strings"
"testing"

"github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/stretchr/testify/assert"
"golang.org/x/net/nettest"

"github.com/apache/apisix-ingress-controller/pkg/metrics"
)

type fakeAPISIXSchemaSrv struct {
Expand Down
9 changes: 9 additions & 0 deletions pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@ func (c *Controller) Run(stop chan struct{}) error {
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
c.MetricsCollector.ResetLeader(false)
// delete the old APISIX cluster, so that the cached state
// like synchronization won't be used next time the candidate
// becomes the leader again.
c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName)
}
},
OnStoppedLeading: func() {
Expand All @@ -355,6 +360,10 @@ func (c *Controller) Run(stop chan struct{}) error {
zap.String("pod", c.name),
)
c.MetricsCollector.ResetLeader(false)
// delete the old APISIX cluster, so that the cached state
// like synchronization won't be used next time the candidate
// becomes the leader again.
c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName)
},
},
// Set it to false as current leaderelection implementation will report
Expand Down
85 changes: 85 additions & 0 deletions test/e2e/chaos/chaos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chaos
tao12345666333 marked this conversation as resolved.
Show resolved Hide resolved
tao12345666333 marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"
"net/http"

"github.com/onsi/ginkgo"
"github.com/stretchr/testify/assert"

"github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
)

var _ = ginkgo.Describe("Chaos Testing", func() {
opts := &scaffold.Options{
Name: "default",
Kubeconfig: scaffold.GetKubeconfig(),
APISIXConfigPath: "testdata/apisix-gw-config.yaml",
IngressAPISIXReplicas: 1,
HTTPBinServicePort: 80,
APISIXRouteVersion: "apisix.apache.org/v2beta2",
}
s := scaffold.NewScaffold(opts)
ginkgo.Context("simulate apisix deployment restart", func() {
ginkgo.Specify("ingress controller can synchronize rules normally after apisix recovery", func() {
assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(0), "checking number of upstreams")
backendSvc, backendSvcPort := s.DefaultHTTPBackend()
route1 := fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta2
kind: ApisixRoute
metadata:
name: httpbin-route1
spec:
http:
- name: route1
match:
hosts:
- httpbin.org
paths:
- /ip
backends:
- serviceName: %s
servicePort: %d
`, backendSvc, backendSvcPort[0])
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(route1))
assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1), "checking number of routes")
s.RestartAPISIXDeploy()
route2 := fmt.Sprintf(`
apiVersion: apisix.apache.org/v2beta2
kind: ApisixRoute
metadata:
name: httpbin-route2
spec:
http:
- name: route2
match:
hosts:
- httpbin.org
paths:
- /get
backends:
- serviceName: %s
servicePort: %d
`, backendSvc, backendSvcPort[0])
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(route2))
assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(2), "checking number of routes")
s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.org").Expect().Status(http.StatusOK)
s.NewAPISIXClient().GET("/get").WithHeader("Host", "httpbin.org").Expect().Status(http.StatusOK)
})
})

})
1 change: 1 addition & 0 deletions test/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package e2e

import (
_ "github.com/apache/apisix-ingress-controller/test/e2e/annotations"
_ "github.com/apache/apisix-ingress-controller/test/e2e/chaos"
_ "github.com/apache/apisix-ingress-controller/test/e2e/config"
_ "github.com/apache/apisix-ingress-controller/test/e2e/endpoints"
_ "github.com/apache/apisix-ingress-controller/test/e2e/features"
Expand Down
9 changes: 9 additions & 0 deletions test/e2e/scaffold/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,15 @@ func (s *Scaffold) newAPISIXTunnels() error {
return nil
}

func (s *Scaffold) shutdownApisixTunnel() {
s.apisixAdminTunnel.Close()
s.apisixHttpTunnel.Close()
s.apisixHttpsTunnel.Close()
s.apisixTCPTunnel.Close()
s.apisixUDPTunnel.Close()
s.apisixControlTunnel.Close()
}

// Namespace returns the current working namespace.
func (s *Scaffold) Namespace() string {
return s.kubectlOptions.Namespace
Expand Down
37 changes: 36 additions & 1 deletion test/e2e/scaffold/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,23 @@ func (s *Scaffold) APISIXGatewayServiceEndpoint() string {
return s.apisixHttpTunnel.Endpoint()
}

// RestartAPISIXDeploy delete apisix pod and wait new pod be ready
func (s *Scaffold) RestartAPISIXDeploy() {
s.shutdownApisixTunnel()
pods, err := k8s.ListPodsE(s.t, s.kubectlOptions, metav1.ListOptions{
LabelSelector: "app=apisix-deployment-e2e-test",
})
assert.NoError(s.t, err, "list apisix pod")
for _, pod := range pods {
err = s.KillPod(pod.Name)
assert.NoError(s.t, err, "killing apisix pod")
}
err = s.waitAllAPISIXPodsAvailable()
assert.NoError(s.t, err, "waiting for new apisix instance ready")
err = s.newAPISIXTunnels()
assert.NoError(s.t, err, "renew apisix tunnels")
}

func (s *Scaffold) beforeEach() {
var err error
s.namespace = fmt.Sprintf("ingress-apisix-e2e-tests-%s-%d", s.opts.Name, time.Now().Nanosecond())
Expand Down Expand Up @@ -368,14 +385,32 @@ func (s *Scaffold) afterEach() {
assert.Nilf(ginkgo.GinkgoT(), err, "deleting namespace %s", s.namespace)

for _, f := range s.finializers {
f()
runWithRecover(f)
}

// Wait for a while to prevent the worker node being overwhelming
// (new cases will be run).
time.Sleep(3 * time.Second)
}

func runWithRecover(f func()) {
defer func() {
r := recover()
if r == nil {
return
}
err, ok := r.(error)
if ok {
// just ignore already closed channel
if strings.Contains(err.Error(), "close of closed channel") {
return
}
}
panic(r)
}()
f()
}

func (s *Scaffold) GetDeploymentLogs(name string) string {
cli, err := k8s.GetKubernetesClientE(s.t)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions tools.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build tools
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new in 1.17, I want to delete it before we fully switch to v1.17

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You haven't pushed the latest code yet?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @tokers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tokers any update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated it but I don't have time to validate the e2e case until this weekend.

tao12345666333 marked this conversation as resolved.
Show resolved Hide resolved
// +build tools

// Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down